Tuesday, November 24, 2015

Spark's sortByKey doesn't


... or at least not when you map. Sure, if you collect you get the elements in a sorted order. But let's say you want to process the elements in a given order, say, finding the difference from one to the next. You might naively write:

    val pairRdd   = sparkContext.parallelize((1 to 10), 3).map(x => (x, x))
    val sortedRdd = pairRdd.sortByKey()

    bogusMap(sortedRdd)
.
.
.
  def bogusMap(sortedRdd: RDD[(Int, Int)]): Unit = {
    var last = 0
    def checkMonotonicKeys(kv: (Int, Int)): Int = {
      val key = kv._1
      if (key != last + 1) throw new IllegalStateException(s"key = $key, last = $last")
      last = key
      key
    }
    val mappedAndSorted = sortedRdd.map(checkMonotonicKeys(_))
    mappedAndSorted.collect().foreach { kv =>
      println(kv)
    }
  }

But you'll see an exception thrown something like:

java.lang.IllegalStateException: key = 8, last = 0

The reason is that the keys are sorted within each partition not across all partitions.

One "solution" is to ensure that all the elements are within one partition such as:

    val sortedInto1Partition = pairRdd.sortByKey(numPartitions = 1)
    bogusMap(sortedInto1Partition)

This works but there is little point to using Spark for it since there is no parallelization. The best solution is to generate the differences when the data was incoming.

Incidentally, this article has a good description of what is happening during a sortByKey operation. Basically, each shuffle has two sides. The first "writes out data to local disk" and the second makes "remote requests to fetch that data... The job of the [first] side of the shuffle is to write out records in such a way that all records headed for the same [second] task are grouped next to each other for easy fetching." Note that the second task that groups data is not obligated to also order it within a group.

As another aside, note the importance of persisting an RDD in this use case.

"Failure to persist an RDD after it has been transformed with partitionBy() will cause subsequent uses of the RDD to repeat the partitioning of the data. Without persistence, use of the partitioned RDD will cause reevaluation of the RDDs complete lineage. That would negate the advantage of partitionBy(), resulting in repeated partitioning and shuffling of data across the network, similar to what occurs without any specified partitioner.

"In fact, many other Spark operations automatically result in an RDD with known partitioning information, and many operations other than join() will take advantage of this information. For example, sortByKey() and groupByKey() will result in range-partitioned and hash-partitioned RDDs, respectively. On the other hand, operations like map() cause the new RDD to forget the parent’s partitioning information, because such operations could theoretically modify the key of each record." [1]

The code above that forces all the data into one partition (using numPartitions = 1) seems immune to map forgetting the the parent RDD's partitioning information. Since there is only one partition, there is no information to forget.

[1] Learning Spark - Karau and Konwinski


No comments:

Post a Comment