apache spark - Can't zip RDDs with unequal numbers of partitions -
now have 3 rdds this:
rdd1:
1 2 3 4 5 6 7 8 9 10 rdd2:
11 12 13 14 rdd3:
15 16 17 18 19 20 and want this:
rdd1.zip(rdd2.union(rdd3)) and want result this:
1 2 11 12 3 4 13 14 5 6 15 16 7 8 17 18 9 10 19 20 but have exception this:
exception in thread "main" java.lang.illegalargumentexception: can't zip rdds unequal numbers of partitions
someone tell me can without exception:
rdd1.zip(rdd2.union(rdd3).repartition(1)) but seems little cost. want know if there other ways solve problem.
i'm not sure mean "cost", you're right suspect repartition(1) not right solution. repartition rdd single partition.
- if data not fit on single machine, fail.
- it works if
rdd1has single partition. when have more data no longer hold. repartitionperforms shuffle, data can end ordered differently.
i think right solution give on using zip, because cannot ensure partitioning match up. create key , use join instead:
val indexedrdd1 = rdd1.zipwithindex.map { case (v, i) => -> v } val indexedrdd2 = rdd2.zipwithindex.map { case (v, i) => -> v } val offset = rdd2.count val indexedrdd3 = rdd3.zipwithindex.map { case (v, i) => (i + offset) -> v } val combined = indexedrdd1.leftouterjoin(indexedrdd2).leftouterjoin(indexedrdd3).map { case (i, ((v1, v2opt), v3opt)) => -> (v1, v2opt.getorelse(v3opt.get)) } this work no matter partitioning. if like, can sort result , remove index @ end:
val unindexed = combined.sortbykey().values