Friday, March 10, 2017

Hadoop and HBase in an integration test


It's always very nice to have your integration tests run out-of-the-box as soon as you check the code out of Git. No fiddling with your environment, just click and go.

Unfortunately, Hadoop uses some native Windows libraries and for those unfortunate enough to develop on Windows, this is a pain - particularly if you have a large, distributed team. So, this is a nice hack I've used a couple of times to get the integration tests to run on developers' laptops without any manual configuration.

First, check the binaries hadoop.dll and winutils.exe in. They're not very big (about 40kb). Then you need to set the System properties:

System.setProperty("java.library.path", BINARY_DIRECTORY)
System.setProperty("hadoop.home.dir",   BINARY_DIRECTORY)

where BINARY_DIRECTORY is the absolute path name of the directory the binaries live in.

Then, before running Hadoop code, you need this dirty hack that introspectively assures Hadoop that everything is OK:

val classLoader = this.getClass.getClassLoader
val field       = classOf[ClassLoader].getDeclaredField("usr_paths")
field.setAccessible(true)
val usrPath     = field.get(classLoader).asInstanceOf[Array[String]]
val newUsrPath  = new Array[String](usrPath.length + 1)
System.arraycopy(usrPath, 0, newUsrPath, 0, usrPath.length)
newUsrPath(usrPath.length) = BINARY_DIRECTORY
field.set(classLoader, newUsrPath)

val field = classOf[org.apache.hadoop.fs.FileSystem].getDeclaredField("FILE_SYSTEMS_LOADED")
field.setAccessible(true)
field.setBoolean(null, true)

val nativeCodeLoadedField = classOf[org.apache.hadoop.util.NativeCodeLoader].getDeclaredField("nativeCodeLoaded")
nativeCodeLoadedField.setAccessible(true)
nativeCodeLoadedField.set(null, false)


(This is Scala but it translates to Java easily enough)

It's ugly and it's dirty but it does the job. It basically tricks Hadoop into thinking everything is OK. It's good enough for tests but don't do it in production. Also, it's not at all necessary for *Nix systems.

Having done this, you can start running the integration tests against HDFS which is backed by your own hard disk. Again, you'd never do this in production (what's the point?) but it really helps with tests.

You do this with the following code (assuming you now want HBase to live on top of Hadoop):

val configuration = org.apache.hadoop.hbase.HBaseConfiguration.create()
configuration.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
configuration.set("fs.hdfs.impl", classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
val utility = new org.apache.hadoop.hbase.HBaseTestingUtility(configuration)
utility.startMiniCluster()

And you're good to go. HBase and HDFS both working on a single machine.


Saturday, February 18, 2017

Knuth and Poissons


Trying to get my head around how Donald Knuth generates samples from a Poisson distribution. The formula is simple:

Let L=eλ, p=1. Count the number of times you multiply p by a uniform distribution in the range [0,1] until p

Simple. But why does it work?

Poisson

The Poisson distribution relies on the Poisson process. That is, the probability that something is about to occur has nothing to do with what has gone on in the past. This is seen throughout nature and makes it very useful in the sciences. For example, the probability of an atom radioactively decaying has nothing to do with how long it has been sitting around for before the decay.

In other words, if X is the inter-arrival times of a memory-less event, X = (X1X2, X3, ... ):
"At each arrival time and at each fixed time, the process must probabilistically restart, independent of the past. The first part of that assumption implies that X is a sequence of independent, identically distributed variables [IIDs] . The second part of the assumption implies that if the first arrival has not occurred by time s, then the time remaining until the arrival occurs must have the same distribution as the first arrival time itself" (from here).
This memory-less property can be expressed mathematically like this:

P(X > t + s | X > s) = P(X > t)


Let F(t) = P(X > t) for t > 0. Then:

F(t + s) = F(t) F(s)

since the probabilities of independent events can be multiplied to give the probability of both happening. (The mathematically inclined may look at this equation and suspect that logarithms are about to make an appearance...)

Now, this is the clever bit. Let a = F(1). Then:

F(n) = F(Σn1) = ΠnF(1) = an

The proof for this being true for all rational n is here but, briefly, if we let λ = - ln (a) then:

F(t) = e-λt

which, if you recall, is the probability that an event will happen at t or later. That is, it's a value between 0 and 1.

Now, we use a method called inversion sampling. The basic idea is that given a plot, y = P(x) then taking uniformly random samples on the y-axis then the corresponding values on the x-axis will give you a distribution that conforms to P(x).

Note that y represents a probability so necessarily 0 ≤ y ≤ 1. Since a uniform random number generator comes with pretty much any programming language, the problem is getting simpler.

The last piece of the jigsaw is documented here and explains why Knuth's algorithm is so simple. Basically, we rearrange the equation for F(t) by taking natural logs:

t = - ln F(t) / λ

and since we're using inversion sampling, we know that the probability F(t) is going to be simulated by a uniformly distributed random variable between 0 and 1. We call it U.

Now, given any random sample Uthere will be a value ti. The question is Knuth asks is: how many random samples Ui do I need before t is a unit of time (ie, 1)?

More mathematically, what is k for:

Σki=1 ti              ≤   1  ≤ Σk+1i=1 ti

- Σki=1  ln U / λ ≤   1  ≤ - Σk+1i=1 ln U / λ

 Σki=1  ln U      ≤  -λ  ≤ Σk+1i=1 ln U

ln (Πki=1 Ui)       ≤  -λ  ≤ ln (Πk+1i=1 Ui)

Πki=1 Ui              ≤  e ≤ Πk+1i=1 Ui

which is Knuth's algorithm. Notice that the beauty of this is that it's computationally cheap for relatively small k. This should be fine for most applications.

Tuesday, February 14, 2017

Tweaking TF-IDF


TF-IDF is a simple statistic (with a few variants) used in information retrieval within a corpus of text. The funny thing is that although it works well, nobody seems to be sure why. Robertson says in On theoretical arguments for IDF:

"The number of papers that start from the premise that IDF is a purely heuristic device and ends with a claim to have provided a theoretical basis for it is quite startling."

The simplest expression for TF-IDF is:

(TF) . (IDF) = (ft,d) . (log (N / |{d∈D : t∈d}|))

where t is a "term" (word, ngram etc); d is a document in corpus D; N is the total number of documents in corpus D; and ft,d is the frequency of term t in document d. (Often the term frequency component is actually a function of the raw ft,d) . "The base of the logarithm is not in general important" (Robertson).

It has been mooted that the IDF component looks like the log of a probability. That is, if we define the number of documents with term ti to be ni then we could expect the probability of a given document containing ti to be:

P(ti) = P(ti occurs in d) = ni/N

and the IDF term looks like:

idf(ti) = - log P(ti)

The nice thing about this is that "if we assume the occurrences of different terms in documents are statistically independent, then addition is the correct thing to do with the logs":

idf(t1∧t2) = -log P(t1∧t2
           = -log(P(t1)P(t2)) 
           = -(log P(t1) + log P(t2)) 
           = idf(t1) + idf(t2)

Thus we can add IDF terms to give us a total score.

Implementations

Lucene used to use this implementation but appear to moving to something called BM25 with which they're getting good results. I tried BM25 but it was unfortunately prone to over-linking my documents compared to the classic formula.

So, I returned to the classic implementation but thought again about an approximation I had made. I had capped the maximum frequency of a given term to be 1000 simply for practical reasons of storage. All other terms are discarded as something that occurs too often is not likely to be useful in this particular domain. Note that I could not filter out a list of stopwords as lots of my corpus was not English. The hope was that stopwords would naturally drop out as part of this restriction. Imposing this limit still gave me over 90 million terms.

The result was the difference between the a rare term and the most common term that we record was not that great. Since I am comparing documents, the minimum document frequency for a term to be useful is 2 (a frequency of 1 obviously doesn't compare documents). If the maximum frequency is 1000 then the ratio of the IDFs would be:

log (N / 2) / log (N / 1000)

which for 90 million documents is only about 1.5. No wonder the results for my document comparisons were not great. Now, if I set N = 1000 + 1 (the +1 to avoid a nasty divide by 0), the ratio between the weighting for the rarest term and the most common is about 6218 which seems more reasonable. And sure enough, more documents seemed to match. (Caveat: I say "seemed to match" as I only examined a sample then extrapolated. There are far too many documents to say definitely that the matching is better).


Wednesday, January 25, 2017

Poisoned Poisson


I needed to code up a simple implementation of the probability mass function (PMF) for the boring old Poisson distribution. Five minutes work, or so I thought.

My first cut of the code looked like this:

  def poissonWithSmallMean(x: Int, mu: Double): Double = Math.pow(mu, x) * Math.exp(-mu) / factorial(x)

where factorial is a very simple implementation of factorial using tail recursion. I could have implemented in a more efficient manner using caching etc but I wasn't expecting too much trouble.

Going to the excellently named StatTrek to get some test values, my test looked like:

    poissonWithSmallMean(10, 12) shouldEqual 0.104837255883659 +- tolerance

and it passed. Great. 

Things started getting strange with slightly larger numbers. StatTrek was telling me that with x=998 and mu=999, we should get the modestly-sized answer of 0.0126209223360251. But my code was blowing up.

It's not hard to see why. That factorial goes crazy pretty quickly. Given that the expected value is not particularly large and not terribly small, how do we compute it simply? 

The answer is here. It depends on logs. If:

n! = n * (n-1) * (n-2) * ... * 1

then

log n! = log n + log(n-1) + log(n-2) + ... + log(1)

Now, our equation to calculate the PMF looks like this:

  def poissonWithLargeMean(x: Integer, mu: Double): Double = {
    val logged = (x * Math.log(mu)) - mu - logFactorial(x)
    Math.exp(logged)
  }

where logFactorial is just a simple implementation of the equation for log n! above. This handles x=998, mu=999 beautifully.

We could start implementing logFactorial in a much more efficient manner by using the Stirling approximation (log n!  n log n). This is a handy equation which incidentally also gives us the limit on comparison sorts. Take a look at John D Cooke's blog in the link above to see how to do this. But this simple solution is fine for me for now.


Friday, January 20, 2017

HBase File Structure


Some notes from a colleague who knows HBase better than I do.

Quick overview of how HBase stores data

When HBase batch writes in-memory data to disk, it may do so in separate files. Each file is sorted by the key.

So, say HBase persists data with keys, A, F, B. It sorts them so the file looks like:

[A, B, F]

Some time later, HBase persists G, P and M. Now there are two files that look like:

[A, B, F],  [G, M, P]

At some later time still, data with keys C, R and D arrive. Key R clearly comes after the last letter already written to the files (P) but D and C comes in the middle of the first file. So, what does it do? It creates a new file with [C, D, R] in it. To summarize, the files look like this:

[A, B, F], [G, M, P], [C, D, R]

Even later, a client wants to find the value for key D. Assuming it's not cached in memory, where does HBase look for it? Well, although all data within a file is ordered (so we can call off the search early if it's not where we would expect it in the file), we don't know which file D is in. HBase must search all files even if it doesn't search through all of a file.

Compaction

To help matters, HBase can compact these three files into one that looks like:

[A, B, C], [D, F, G], [M, P, R]

Well, that makes searching much easier.

Read-Write Patterns

How can we leverage this? Well, in certain read-write patterns, we could force a compaction at an appropriate point. For instance, I'm using HBase to store a dictionary I build when processing a corpus of text. Later, I refer to that dictionary but I never update or add any more data. So, after I have finished my last write, I can call:

hbaseConnection.getAdmin.majorCompact(TABLE_NAME)

After compacting the table, my read throughput went up by a factor of 4.

For more information on HBase compaction see here.



Thursday, December 22, 2016

Spark and the GC


Spark is memory hungry and that means you need to consider the garbage collector when tuning.

There are a number of GCs to chose from that may or may not be appropriate for your job. Some say that G1 may give you (soft) guarantees for maximum latency time but are not good for batch jobs. Others point out that CMS may give lower pauses than ParallelGC (the default on 64-bit machines) but that ParallelGC has better overall throughput and since we're running a batch job not a web server, ParallelGC may suit Spark.

More logs than a lumberjack

Whichever GC you use, you'll need to turn logging on and analyse the logs.

You can see objects being promoted from young to old generation (see here and here) but it's inferred rather than stated explicitly. At a young GC, you'll see something like:

[PSYoungGen: BEFORE_YOUNG->AFTER_YOUNG(CAPACITY_YOUNG)] BEFORE_HEAP->AFTER_HEAP(CAPACITY_HEAP)

Which describes the memory usage in the young generation before GC (BEFORE_YOUNG), after (AFTER_YOUNG) and the capacity of the young generation.

It also shows the usage of the heap as a whole before (BEFORE_HEAP), after (AFTER_HEAP) and the heap's total capacity.

Note that it is not necessarily the case that:

Δ = (BEFORE_YOUNG-AFTER_YOUNG) - (BEFORE_HEAP-AFTER_HEAP) != 0

This is because not all of the young generation was GCed. Some was promoted to the old generation.

In an attempt to avoid premature promotion, and with JVMs that had 8gb of memory, I set -XX:MaxNewSize=6G -XX:SurvivorRatio=6 but instead of taking 5 hours, it took 6. I tried this because there was a lot of new generation GC and not a lot of old.

Trying G1GC took just over 6 hours. And setting -XX:MaxNewSize=1G -XX:SurvivorRatio=3 just had to be killed as it looked like it was going to take days. So, I had to look elsewhere.

Disks

You can also check your disk access times by using a script taken from here, namely:

dd if=/dev/zero of=/tmp/output.img bs=8k count=256k
rm /tmp/output.img

A decent HDD will typically give you about 300MB/s, a bog-standard SSD (my NUC at home) about 500MB/s.

The best way to avoid Garbage Collection...

... is not to create any garbage. This might not always be possible but you can minimize GC. Kryo will compress objects amazingly well by replacing the verbose String that represents their FQNs with just a byte or two. However, it will only compress components of the class you register with it, not a deep tree of components. For instance, if you register org.apache.spark.mllib.linalg.SparseVector, you'd do well to also register Array[Int] and Array[Double] as these are what hog memory.

It is not sufficient to just register Array[_].

You can put something like sparConf.set("spark.kryo.registrationRequired", "true")in your code to see what needs to be explicitly added. This will cause runtime exceptions to be thrown until everything to be (un)compressed has been explicitly registered.

After these changes, the shuffle went from some 2.3TB to 300GB and consequently the time for the job dropped from about 5 hours to less than 45 minutes. With less memory to churn there was less GC and this makes a huge difference to how long your Spark job takes.

Friday, December 16, 2016

More Spark tuning


I've spent some time trying to tune matrix multiplication in Spark. I had 32 executors each with 4 threads and sampled the stacks with jstack over a few minutes with a handy little script:

> cat ~/whatsEveryoneDoing.sh
for BOX in `cat ~/boxes.txt` ; do { echo $BOX ; ssh $BOX "for PID in \`ps -ef | grep $1 | grep jdk8 | grep -v bash | awk '{print \$2}'\` ; do { sudo -u yarn jstack \$PID | grep -A45 ^\\\"Exec ; } done ; echo " 2>/dev/null ; } done

where ~/boxes.txt is just a file with the addresses of all the boxes in the cluster and the argument with which you invoke it is the application ID of the job.

I noticed that of my 128 executor threads, in each sample 50 or so were contending for the lock at:

org.apache.spark.storage.memory.MemoryStore.reserveUnrollMemoryForThisTask(Memstore.scala:570)

(This is Spark 2.0.2).

So, reducing --exector-cores from 4 to 2 while only increasing the number of executors from 32 to 48 had a big impact. The runtime was reduced from about 7.5 hours to 5 hours.