Friday, January 20, 2017

HBase File Structure


Some notes from a colleague who knows HBase better than I do.

Quick overview of how HBase stores data

When HBase batch writes in-memory data to disk, it may do so in separate files. Each file is sorted by the key.

So, say HBase persists data with keys, A, F, B. It sorts them so the file looks like:

[A, B, F]

Some time later, HBase persists G, P and M. Now there are two files that look like:

[A, B, F],  [G, M, P]

At some later time still, data with keys C, R and D arrive. Key R clearly comes after the last letter already written to the files (P) but D and C comes in the middle of the first file. So, what does it do? It creates a new file with [C, D, R] in it. To summarize, the files look like this:

[A, B, F], [G, M, P], [C, D, R]

Even later, a client wants to find the value for key D. Assuming it's not cached in memory, where does HBase look for it? Well, although all data within a file is ordered (so we can call off the search early if it's not where we would expect it in the file), we don't know which file D is in. HBase must search all files even if it doesn't search through all of a file.

Compaction

To help matters, HBase can compact these three files into one that looks like:

[A, B, C], [D, F, G], [M, P, R]

Well, that makes searching much easier.

Read-Write Patterns

How can we leverage this? Well, in certain read-write patterns, we could force a compaction at an appropriate point. For instance, I'm using HBase to store a dictionary I build when processing a corpus of text. Later, I refer to that dictionary but I never update or add any more data. So, after I have finished my last write, I can call:

hbaseConnection.getAdmin.majorCompact(TABLE_NAME)

After compacting the table, my read throughput went up by a factor of 4.

For more information on HBase compaction see here.



Thursday, December 22, 2016

Spark and the GC


Spark is memory hungry and that means you need to consider the garbage collector when tuning.

There are a number of GCs to chose from that may or may not be appropriate for your job. Some say that G1 may give you (soft) guarantees for maximum latency time but are not good for batch jobs. Others point out that CMS may give lower pauses than ParallelGC (the default on 64-bit machines) but that ParallelGC has better overall throughput and since we're running a batch job not a web server, ParallelGC may suit Spark.

More logs than a lumberjack

Whichever GC you use, you'll need to turn logging on and analyse the logs.

You can see objects being promoted from young to old generation (see here and here) but it's inferred rather than stated explicitly. At a young GC, you'll see something like:

[PSYoungGen: BEFORE_YOUNG->AFTER_YOUNG(CAPACITY_YOUNG)] BEFORE_HEAP->AFTER_HEAP(CAPACITY_HEAP)

Which describes the memory usage in the young generation before GC (BEFORE_YOUNG), after (AFTER_YOUNG) and the capacity of the young generation.

It also shows the usage of the heap as a whole before (BEFORE_HEAP), after (AFTER_HEAP) and the heap's total capacity.

Note that it is not necessarily the case that:

Δ = (BEFORE_YOUNG-AFTER_YOUNG) - (BEFORE_HEAP-AFTER_HEAP) != 0

This is because not all of the young generation was GCed. Some was promoted to the old generation.

In an attempt to avoid premature promotion, and with JVMs that had 8gb of memory, I set -XX:MaxNewSize=6G -XX:SurvivorRatio=6 but instead of taking 5 hours, it took 6. I tried this because there was a lot of new generation GC and not a lot of old.

Trying G1GC took just over 6 hours. And setting -XX:MaxNewSize=1G -XX:SurvivorRatio=3 just had to be killed as it looked like it was going to take days. So, I had to look elsewhere.

Disks

You can also check your disk access times by using a script taken from here, namely:

dd if=/dev/zero of=/tmp/output.img bs=8k count=256k
rm /tmp/output.img

A decent HDD will typically give you about 300MB/s, a bog-standard SSD (my NUC at home) about 500MB/s.

The best way to avoid Garbage Collection...

... is not to create any garbage. This might not always be possible but you can minimize GC. Kryo will compress objects amazingly well by replacing the verbose String that represents their FQNs with just a byte or two. However, it will only compress components of the class you register with it, not a deep tree of components. For instance, if you register org.apache.spark.mllib.linalg.SparseVector, you'd do well to also register Array[Int] and Array[Double] as these are what hog memory.

It is not sufficient to just register Array[_].

You can put something like sparConf.set("spark.kryo.registrationRequired", "true")in your code to see what needs to be explicitly added. This will cause runtime exceptions to be thrown until everything to be (un)compressed has been explicitly registered.

After these changes, the shuffle went from some 2.3TB to 300GB and consequently the time for the job dropped from about 5 hours to less than 45 minutes. With less memory to churn there was less GC and this makes a huge difference to how long your Spark job takes.

Friday, December 16, 2016

More Spark tuning


I've spent some time trying to tune matrix multiplication in Spark. I had 32 executors each with 4 threads and sampled the stacks with jstack over a few minutes with a handy little script:

> cat ~/whatsEveryoneDoing.sh
for BOX in `cat ~/boxes.txt` ; do { echo $BOX ; ssh $BOX "for PID in \`ps -ef | grep $1 | grep jdk8 | grep -v bash | awk '{print \$2}'\` ; do { sudo -u yarn jstack \$PID | grep -A45 ^\\\"Exec ; } done ; echo " 2>/dev/null ; } done

where ~/boxes.txt is just a file with the addresses of all the boxes in the cluster and the argument with which you invoke it is the application ID of the job.

I noticed that of my 128 executor threads, in each sample 50 or so were contending for the lock at:

org.apache.spark.storage.memory.MemoryStore.reserveUnrollMemoryForThisTask(Memstore.scala:570)

(This is Spark 2.0.2).

So, reducing --exector-cores from 4 to 2 while only increasing the number of executors from 32 to 48 had a big impact. The runtime was reduced from about 7.5 hours to 5 hours.

Wednesday, December 14, 2016

Bayesian Fun


The Model

Dr Bristol has tea with statistician Sir Ronald Fisher. She claims she can tell the difference between when the tea is poured first then the milk or milk first then the tea. Sir Ronald doesn't quite believe this and tests her. Sure enough, she gets it right five out of six times. Is this statistically significant?

Sir Ronald "arrived at a method that consists of calculating the total probability of the observed result plus the probability of any more extreme results possible under the null hypothesis (i.e., the probability that she would correctly identify 5 or 6 cups by sheer guessing). This probability is the p-value. If it is less than .05, then Fisher would declare the result significant and reject the null hypothesis of guessing." (from here).

The p-value debate has been rumbling for a while ("for example, a 0.05 p-value does not imply the false positive rate is 5.0%, it can be much higher."). Briefly, if there are 100 000 hypothesis and only 10% are true, of the 90 000 that are not true, 4500 will randomly be within the 5% (p-value = 0.05) range. So, 4500 of the 14 500 hypothesis that we think are true are false - and this is if we correctly identify all of the 10%! That means 31% of our knowledge is rubbish and that's the best case scenario!

Devout Bayesian, Prof Dennis Lindley, in this article discusses why they are inadequate and proposes a Bayesian solution.

The Problem

Lindley notes that when using p-values, the probabilities change depending on your experiment and not the results. To illustrate:

"If the experiment consisted of 6 pairs of cups being tested and the result was RRRRRW, the relevant probability is .109", that is 6C5 ½6C6 ½6 and here, R means 'right' and W means 'wrong'.

"If the experiment consisted of pairs being tested until the first error, with the same result, the relevant probability is .031", that is ½5.

This "leads to absurdities because it hinges upon the nonexistent ability to define what other unobserved results would count as “more extreme” than the actual observations. That is, if Fisher had set out to serve Dr. Bristol 6 cups (and only 6 cups) and she is correct 5 times, then we get a p-value of .1, which is not statistically significant. According to Fisher, in this case we should not reject the null hypothesis that Dr. Bristol is guessing. But had he set out to keep giving her additional cups until she was correct 5 times, which incidentally required 6 cups, we get a p-value of .03, which is statistically significant. According to Fisher, we should now reject the null hypothesis. Even though the data observed in both cases are exactly the same, we reach different conclusions because our definition of “more extreme” results (that did not occur) changes depending on which sampling plan we use." (Etz et al)

Bayesian Analysis

Bayes defined the posterior probability as:

posterior = K x prior x likelihood

"where K is a number chosen to make the integral of the right-hand side 1." (Lindley)

"Likelihood is a funny concept. It’s not a probability, but it is proportional to a probability ... Since a likelihood isn’t actually a probability it doesn’t obey various rules of probability. For example, likelihood need not sum to 1.

"A critical difference between probability and likelihood is in the interpretation of what is fixed and what can vary. In the case of a conditional probability, P(D|H), the hypothesis is fixed and the data are free to vary. Likelihood, however, is the opposite. The likelihood of a hypothesis, L(H|D), conditions on the data as if they are fixed while allowing the hypotheses to vary.

"Bayes factors are simple extensions of likelihood ratios. A Bayes factor is a weighted average likelihood ratio based on the prior distribution specified for the hypotheses." [from Etz's blog]

The Priors

Lindley considers the case of wine and tea tasting and people who claim they can tell you about the process with which they were made. The author believes that wine experts really know what they're tasting while tea experts do not.

Lindley encodes these beliefs in the following equations:

Belief that the wine taster can correctly identify wines with probability P:

48(1-P)(P-½) for ½ < P < 1

"This expresses the fact that I think that she can discriminate but can make mistakes. The value 48 makes the total probability 1," he says.

Belief that the tea taster can correctly identify the preparation process with probability P:

l.6(l-P) for P ≥ ½

Note the l.6 is derived from integrating this function between  P ≥ ½ and equating it to 1 (ie, the area under the curve cannot exceed 1.0 as that's the highest a probability can go).

The Data

Given 6 trials, the probability that the taster correctly identifies all 6 is P6. The posterior in this case for tea is:

normalizer . prior . likelihood = K (1 - P) P6 for ½ < P < 1

The probability that the taster identifies the first 5 correctly but the last incorrectly is P5(1-P).

"The prior value of this probability was .8, which drops to .59 when 1 error is made in 6 pairs", that is, P at the maximum posterior at RRRRRW is about 0.63.

"...and to .23 with no errors", that is, P at the maximum posterior for RRRRRR is about 0.86. So, the prior for this is 1.6 x (1 - 0.86) = 0.23

[Incidentally, we know this because the posterior ~ P6(1-P). So, if we differentiate to find the maximum:

(P6(1-P)) = 0
dP

or

6P5-7P6=0

which means P=6/7. This is the probability of the taster having special powers. Plug this in to our equation representing out belief and this becomes l.6(l-6/7) which is the 0.23 Professor Lindley is talking about.]

Naturally, the graph for wine tasting is different as our priors are a quadratic, not a linear equation in P. Their peaks are somewhat closer to 1.0 than our tea tasters (as we'd expect given the prior expresses greater confidence). But it's also worth noting the shape of the graphs. There is greater certainty when we see 6 Rs than with RRRRRW as the curve is sharper.

Conclusions

"It is typically true that the posterior probability of the null hypothesis exceeds the significance level, though there is no logical connection between the two values." (Lindley)

"Lindley’s Bayesian approach depends only on the observed data, so the results are interpretable regardless of whether the sampling plan was rigid or flexible or even known at all. Another key point is that the Bayesian approach is inherently comparative: Hypotheses are tested against one another and never in isolation" (Etz et al).


Tuesday, November 29, 2016

Spark matrix multiplication example


Here's an example of cosine similarities in Spark using the previously described algorithm.

Imagine we have a matrix where the rows are vectors:

0
0.5
0.5
0.4
0.25
0.5
0
0
0.4
0.8
0
0.25
0
0
1.0
0

In Spark, this is represented by an RDD of SparseVectors. Each SparseVector represents one row in the matrix.

Recall that for each vector, we attach the corresponding row and column index to each non-zero element. We then reduceByKey using the column index as our key.

We do this to all columns but as an example, let's take the second column (k=2). 

0
0.5
0.5
0.4
0.25
0.5
0
0
0.4
0.8
0
0.25
0
0
1.0
0

Giving us the tuples { (1, 0.5), (2, 0.5), (3, 0.8), (4, 0) } corresponding to k=2. Note that the first element of each tuple is the row index and the second the value in the highlighted cell.

Since the this tuple is keyed on k, all these values will end up in the same Spark partition.

Partition A(k=2) -> { (1, 0.5), (2, 0.5), (3, 0.8)  }
Partition B(k=3) -> ...
Partition C...

Note that we're not interested in values that are zero so we can discard the last tuple in the original set (zeroes simply disappear). For all the others, we want a Cartesian product of this set with itself and then we want to multiply out the values for each pair. Furthermore, we're not interested in the product of a tuple with itself and we only care about co-ordinates (i,j) when i< j because we don't want to do the same work twice.

So the result will look like this:

{ ((1,2), 0.25), ((1,3), 0.4), ((2,3), 0.4) }

At this point, we're no longer interested in k and can discard it. Each element of the above set moves to a partition according to the first element of the tuple. So, we'd expect something like:

Partition A
Partition B((1,2), 0.25), ((1,3), 0.4)
Partition C((2,3), 0.4)

Note that all the interesting information in the example so far was (arbitrarily) on Partition A. Now, it's been shuffled to Partitions B and C. This is just an example. The shuffled data could have ended up in any partition.

If we do the same again for, say, the last column, we'd expect the results of multiplying rows 1 and 3. The other rows have zero in that column so we're not interested in them.

Now, our Spark partitions would look something like:

Partition A
Partition B((1,2), 0.25), ((1,3), 0.4), ((1,3), 0.1)
Partition C((2,3), 0.4)

When we're done with all the columns, our partitions will look something like:

Partition A
Partition B((1,2), 0.25), ((1,3), 0.4), ((1,3), 0.1), ((1,4) 0.5)
Partition C((2,3), 0.4), ((2,3), 0.1)

Finally, if we reduce on each partition by the co-ordinates key, we'll get:

Partition A
Partition B((1,2), 0.25), ((1,3), 0.5), ((1,4) 0.5)
Partition C((2,3), 0.5)

which if we imagine is a matrix looks like this:

0
0.25
0.5
0.5
0
0
0.5
0
0
0
0
0
0
0
0
0

But, wait, that's not what our matrix multiplied with its transpose should look like. If we take the matrix at the top of this page and multiply it by its transpose we get:

0.66
0.25
0.5
0.5
0.25
0.3125
0.5
0
0.5
0.5
0.8625
0
0.5
0
0
1.0

The reason, of course, is that we're not interested in the diagonal as that's the similarity of a vector with itself (in real life, the diagonals would each have value 1.0 but I wanted to keep the numbers simple). What's more, we're only interested in half the matrix (the upper half) as the lower half is the same and we don't want to do the same work twice (in fact, any matrix multiplied by its transpose will be symmetric like this).

Consequently, only the upper half (the highlighted cells) are the same as the matrix we calculated. The rest of it is of no use to us.

An Alternative Approach to Matrices in Spark


I've been writing some code that performs TF-IDF on a corpus of text made up of M number of records. Given the frequency of each term, I can build a normalized vector for each record made up of the terms in it. To reduce complexity, I ignore terms that appear more than W times.

Cosine Similarities and Matrix Multiplication

(Note: since Spark 1.6.0, there is a method in IndexedRowMatrix.columnSimilarities to compute cosine similarities. However, this only delegates to an old 1.2.0 method that uses DIMSUM. This blows up on anything remotely large with OOMEs. See here for more information).

If we have a set of vectors that we want to multiply together to find their cosine similarities, we can imagine them as a matrix where each row in that matrix is one of our vectors.

So, imagine we have M vectors { v1, v2, v3, ... vM }. Each vector is made up of N elements so, for instance, vector vcan be expressed as { v11, v12, v13 ... v1N }.

We can express this as a matrix that looks like:

v11v12v13...v1N
v21v22v23...v2N
v31v32v33...v3N
.......
vM1vM2vM3...vMN

(You'll notice that each row represents a record and each column represents a term. If a term appears in that record then vij is non-zero).

What we want is a matrix that holds every vector multiplied by every other vector. For instance, for v1 and v2 we want

v11v21+ v12v22+ v13v23 +... v1Nv2N

Well, this is the same as multiplying the matrix by its own transpose:
v11v12v13...v1N
v21v22v23...v2N
v31v32v33...v3N
.......
vM1vM2vM3...vMN
v11v21v31...vM1
v12v22v32...vM2
v13v22v33...vM3
.......
v1Nv2Nv3N...vMN
and looking at element (1, 2) of that matrix for this particular pair in this particular example.

So, using the brevity of summation notation, if this matrix is called A then:


N
Aij  = Σvinvjn

n=1

Or, using Einstein notation, we can state it even more succinctly:

Aij = vinvjn

since the n index is repeated.

Given two matrices, one m x n the other n x p, the complexity of multiplying them is O(mnp). In our case, we're multiplying a matrix by its transpose so p = m. Therefore, the complexity is O(m2n) - pretty bad.

The good news is that our matrix is sparse. How sparse depends on our variable W that determines at what point we start discarding terms in our TF-IDF stage.

Spark Implementation

Since we are just multiplying a matrix by its transpose, we might want to use BlockMatrix.multiply. However, the documentation tells us:
"The output BlockMatrix will only consist of blocks of DenseMatrix. This may cause some performance issues until support for multiplying two sparse matrices is added."
Since M and N for us is typically hundreds of millions, this is a problem.

So, we might want to just manually multiply any two vectors, i and j, in our equation for Aij. The trouble is that if we have two vectors in a Spark RDD, vi and vj, they may very well live in different partitions. And since we want to calculate vi . vj for all i,j then we'll definitely be hitting all partition combinations and there will be O(M2) multiplications.

We want to reduce the amount of inter-partition shuffling and avoid O(M2) multiplications of N-sized vectors. One solution might be to take each vector and for each non-zero element attach it's column ID which we'll call k. We can then reduceByKey (where the k is the key) and start multiplying out the set of values for each key.

That is, associated with every column, k, there will be a set of vectors that have non-zero elements for their k-th element. There will be a maximum of W of them since that's the limit we imposed on how often a term appears before we consider it insignificant. We'll call this set K where in set-builder notation:

K = { vxk ∈ A | vxk ≠ 0 and |K| ≤ W}

We then take a Cartesian product of this set and attach the tuple (i,j), that is, its co-ordinates such that we have a set, D, where:

Dk = { ((i,j), vik vjk) | vikvjk ∈ K and i < j}

We ignore the case where i=j since we're not interested in the cosine similarities of a vector with itself. We know it's 1 as they're unit vectors.

Note, that this can be computationally expensive as it scales as O(W2).

Now we can reduceByKey on the co-ordinates (i,j) and we'll have obtained the cosine similarity for all i, j.

Validity of this approach

Mathematically, if our column index is k and we call the contribution of the term corresponding to k to any cosine similarity between any pair of vectors (i,j), Di,jk then:

Di,jk = vikvjk

which is just our equation above for Aij (with k=n) thus proving that this technique is mathematically correct at least. How good its performance is something else...

By making the key the first element of a tuple, all values will live in the same partition. Spark will partition Tuple2s according to the first element in the tuple.

Partitions

We can demonstrate this so:

    type Pair = (Long, Long)

    val pairs = (1 to 100).map(x => (x.toLong, Random.nextLong()))
    val rdd   = sparkContext.parallelize(pairs)

which gives us an RDD of random pairs of Longs, the first being between 1 and 100. Now we create a function that can map over the RDD assigning each element a number unique to the partition it's in:

  type Enumerated[T] = (Int, T)

  val idGenerator = new AtomicInteger(0)
  def partitionMapper[T]: Iterator[T] => Iterator[Enumerated[T]] = { pairIterator =>
    val partitionId = idGenerator.getAndIncrement()
    val enumerated  = ArrayBuffer[Enumerated[T]]()
    pairIterator.foreach { pair =>
      enumerated += ((partitionId, pair))
    }
    enumerated.toIterator
  }

Finally, we see the output:

    val partitioned  = rdd mapPartitions partitionMapper[Pair]
    val inMem        = partitioned.collect()
    val groupedPairs = inMem.groupBy(_._1).toSeq.sortBy(_._1)
    groupedPairs foreach { case (partitionId, enumerated) =>
      val vals = enumerated.map(_._2._1)
      println(s"partition #$partitionId: ${vals.min}  to ${vals.max} ")
    }

Which produces:

partition #0: 51  to 75
partition #1: 1  to 25
partition #2: 26  to 50
partition #3: 76  to 100

Very neat.

Conclusion

It looks like I am not the only person who is rolling-his-own distributed matrix multiplication solution using Spark (see this link here). It seems others have found you don't get it out-of-the-box. There appear to be moves afoot to address this.

Sunday, November 27, 2016

GraphX


Spark tries to abstract you from the data so you can write beautiful, functional code. The truth is that you have to get your hands dirty if you don't want it to run like a dog.

I'm processing a graph of over 100 million edges and about 20 million vertices. It's been a learning experience to make it perform. It first took over 24 hours to execute the Connected Components algorithm. I finally managed to run it in less than an hour. But it wasn't easy.

Cluster configuration

I didn't find GraphX very memory hungry, at least not for the executors (the entire graph was only some 4gb on HDFS). After 24 hours on my first attempt, the job looked far from over. Using jstat, I noticed that the driver was GCing a lot. It had 8gb but I increased it to 20 and it was much more happy.

Although one is often recommended to run a larger number of executors with fewer cores each, for GraphX this might not be the best configuration. Facebook found:

"Even though both systems can run multiple workers on a single physical machine, we found that it is more efficient when we start only one worker process per machine and allocate all the available memory to it. In the following experiments, the number of workers is also the number of physical machines used for each job. We allocated 80 GB of RAM to each worker."

The shape of the graph

What your graph looks like makes a difference to the performance of GraphX. Looking at the maximum number of "Shuffle write Size / Records" in each super step (mapPartitions at GraphImpl.scala:207 in Spark 2.0.2) in the Spark web GUI, we see the numbers steadily decreasing.

This depends on the shape of the graph.

"GraphFrames and GraphX both use an algorithm which runs in d iterations, where d is the largest diameter of any connected component (i.e., the max number of hops between any 2 nodes in a component). So the running time will vary significantly depending on the your graph's structure. Tuning the vertices and edges DataFrames by making sure to cache them and possibly adjust the partitioning beforehand can help." (from DataBricks forum)

As somebody pointed out on StackOverflow:

"If you have a cluster where each of the vertexes is connected to every other vertex, then after one round of messages each one knows who the lowest VertexID is, and they all go silent the next round. One sequential step, the entire cluster.

"If, on the other hand, each vertex in the cluster is only connected to at most 2 other vertices, then it could take N sequential steps before all the vertices know who what the minimum VertexID is."

I tested this by making a graph that was just a chain of numbers 1, 2, 3, 4 ... 20. I found that after 10 super-steps, there were 10 resolved connected components: {1 to 11}, {12}, {13}, {14} ... {20}.

Partitioning

How do you partition a graph? Facebook said that they got good results using EdgePartition2D. The trick here is to imagine the graph as matrix where for a given vertex X, all non-zero elements in row X indicate an outgoing edge and all non-zero elements in row X indicate an incoming edge.

Therefore, all the details for a given vertex will either be in single row and a single column. So, for an NxN matrix, all the data will be in 2N elements (ie one row plus one column). Equivalently, if there are E edges, the upper bound on the number of partitions where we store the data for X is O(√E) assuming a maximum of one edge per pair of vertices (which is not a problem for the connected components algorithm).

But the biggest saving came from reducing the number of partitions. The previous stage of the pipeline had deliberately and explicitly increased the number of partitions to avoid running out of memory when performing matrix multiplication. This can be a disaster for GraphX.

The previous job had increased the number of partitions to some 18k. What's going on is described here:
As the graph algorithm progresses, it is common for less and less [sic] of the vertices to remain active. Therefore a full scan of all triplets during a map-reduce becomes less and less effective. “For example, in the last iteration of connected components on the Twitter graph, only a few of the vertices are still active. However, to execute mrTriplets we still must sequentially scan 1.5 billion edges and check whether their vertices are in the active set.”
Although GraphX has clever optimizations here, there's was still a huge amount of overhead in constantly mapping and reducing.

Coalescing the 18 000 partitions to a mere 360 on a cluster with 8 boxes and 24 cores per box, the time to run was reduced to a healthy 10-40 minutes depending on sub-graph structures.