Monday, March 21, 2016

Little differences and Big Data


Typically, a small amount of code is executed against a large amount of data in Spark etc. Because of this, your code needs to be efficient. I've just come a code that was taking more time to read and parse the data than to actually do any computations on it!

Don't be lazy

The problem showed up when I looked at the threads of the cluster's executors (you don't need jstack to do this; you can do it from the Spark GUI). Most executor threads were BLOCKED on accessing a lazy val while only one thread was RUNNABLE while accessing it. Oops - lazy vals use synchronization under the covers (see here for why lazy can be expensive).

Not always obvious

Another innocuous looking piece of code might be this:

    private static final Random random = new java.util.Random();

    static long[] generateRandom(int num, long maxExclusive) {
        long    bitMask = randomBitMask(maxExclusive);
        int     index   = 0;
        long    count   = 0;
        long[]  results = new long[num];

        while (index < num) {
            long nextVal = count ^ bitMask;
            if (nextVal < maxExclusive) {
                results[index] = nextVal;
                index++;
            }
            count++;
        }
        return results;
    }

    private static long randomBitMask(long maxExclusive) {
        long seed = random.nextLong();
        return seed & createMask(maxExclusive);
    }

    private static long createMask(long highest) {
        long leftMost = Long.highestOneBit(highest);
        return (leftMost << 1) - 1;
    }


This is an attempt at implementing a non-repeating list of random numbers using a Linear Feedback Shift Register.

Basically, I wanted a list of (semi-) random numbers that don't repeat themselves. If you want only a few from a large set, you might just get a random one and add it to the list, re-trying if you've seen it before. This is fine but if you want lots of numbers out of a set only slightly larger  (that is when num is comparable to maxExclusive), you'll be doing a lot of retrying. This is where the code above comes in. Unfortunately, it is considerably slower than the "try again" approach when num is a good deal less than maxExclusive.

So, benchmarking with this in the pom.xml:

        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-core</artifactId>
            <version>${jmh.version}</version>
        </dependency>
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-generator-annprocess</artifactId>
            <version>${jmh.version}</version>
        </dependency>


(where jmh.version is 1.11.3) and something in a src/test/java that looks something like this:

    public static final int LARGE = 100000000;

    /**
     Result "javaRandomNumbersBenchmark_LARGE":
     198.036 ±(99.9%) 37.176 ops/s [Average]
     (min, avg, max) = (123.962, 198.036, 290.256), stdev = 42.812
     CI (99.9%): [160.861, 235.212] (assumes normal distribution)

     Benchmark                                               Mode  Cnt    Score    Error  Units
     RandomStatsBenchmark.javaRandomNumbersBenchmark_LARGE  thrpt   20  197.703 ± 36.994  ops/s

     */
    @Benchmark
    @Fork(1)
    public void javaRandomNumbersBenchmark_LARGE() {
        JavaRandomStats.generateRandom(10, LARGE);
    }

    /**
     Result "simpleUniqueRandomNumbersBenchmark_LARGE":
     3103855.467 ±(99.9%) 24691.158 ops/s [Average]
     (min, avg, max) = (2845502.900, 3103855.467, 3277692.295), stdev = 104543.910
     CI (99.9%): [3079164.308, 3128546.625] (assumes normal distribution)
     */
    @Benchmark
    @Fork(1)
    public void simpleUniqueRandomNumbersBenchmark_LARGE() {
        uniqueRandomNumbersSmallNumLargeMax(10, LARGE);
    }

I got the results you see in the comments. I'm not currently sure why my attempt at optimisation is such a performance killer.

More contention

Spark's Broadcast variables are an efficient way to access read-only data without making network calls. However, in the documentation examples, you'll see code like this:

records mapPartitions { rowIter =>
  rowIter map { case (record) =>
    val x = broadcastX.value
    // ... do something with x
  }
}

But I noticed a lot of contention when reading the value deep down in TorrentBroadcast.readBroadcastBlock (version 1.3.1). Re-writing the code slightly to:

records mapPartitions { rowIter =>
  val x = broadcastX.value
  rowIter map { case (record) =>
    // ... do something with x
  }
}

meant the executor threads suffered far less contention as the broadcast variable is only obtained once. Doing this, another 10% or so of time was saved.


No comments:

Post a Comment