apache spark - Can't zip RDDs with unequal numbers of partitions -
now have 3 rdds this:
rdd1:
1 2 3 4 5 6 7 8 9 10
rdd2:
11 12 13 14
rdd3:
15 16 17 18 19 20
and want this:
rdd1.zip(rdd2.union(rdd3))
and want result this:
1 2 11 12 3 4 13 14 5 6 15 16 7 8 17 18 9 10 19 20
but have exception this:
exception in thread "main" java.lang.illegalargumentexception: can't zip rdds unequal numbers of partitions
someone tell me can without exception:
rdd1.zip(rdd2.union(rdd3).repartition(1))
but seems little cost. want know if there other ways solve problem.
i'm not sure mean "cost", you're right suspect repartition(1)
not right solution. repartition rdd single partition.
- if data not fit on single machine, fail.
- it works if
rdd1
has single partition. when have more data no longer hold. repartition
performs shuffle, data can end ordered differently.
i think right solution give on using zip
, because cannot ensure partitioning match up. create key , use join
instead:
val indexedrdd1 = rdd1.zipwithindex.map { case (v, i) => -> v } val indexedrdd2 = rdd2.zipwithindex.map { case (v, i) => -> v } val offset = rdd2.count val indexedrdd3 = rdd3.zipwithindex.map { case (v, i) => (i + offset) -> v } val combined = indexedrdd1.leftouterjoin(indexedrdd2).leftouterjoin(indexedrdd3).map { case (i, ((v1, v2opt), v3opt)) => -> (v1, v2opt.getorelse(v3opt.get)) }
this work no matter partitioning. if like, can sort result , remove index @ end:
val unindexed = combined.sortbykey().values