Monday, September 18, 2017

Spark Partitions

Some miscellaneous Spark/GraphX partitioning notes that I have made.

Co-partitioned and co-located are not the same thing

"Even if the same partitioner is used for an RDD that needs to be partitioned within the current job as was used to partition another RDD that is needed in the current job but whose partitioning was persisted in a prior job, then the RDDs will still be co-partitioned but won't necessarily be co-located."

"There are two things here. One is data co-partition, and then data co-location.

"As long as you specify the same hash partitioner, a1 and b1 will be co-partitioned (i.e. partitioned the same way). The partitions might not be co-located, so your job will still incur network traffic, but you do reduce a whole round of shuffle.

"If a1 and b1 are put into memory by different jobs, there is no guarantee their partitions will be co-located, even though they are co-partitioned.

"What if you add a node to the cluster, or a node dies? Or what if a node is busy and can never be run anything - are you going to block the entire program forever so you could put a partition on that node?

"You can still write your program in a way that does mostly local joins. And even if the joins need to fetch blocks from remote nodes, the joins still avoid an extra round of shuffle, which is much more expensive than just fetching data."

From a Google Groups discussion (here).


Partitioning graphs present another interesting challenge. Edge cuts (ie, partitioning vertices) are a viable but generally inefficient way to partition a graph.

Most graphs obey a power law, for example something like Zipf's law if you're processing natural language. ("Zipf's law states that given some corpus of natural language utterances, the frequency of any word is inversely proportional to its rank in the frequency table." from Wikipedia). Consequently, the partitioning of vertices will be 'lumpy'.

GraphX allows you to chose your partitioning strategy. It offers these:

From "Spark GraphX in Action", p214.
You can find my experience of EdgePartition2D here.

"Introducing a .repartition will increase the amount of work that the Spark engine has to do, but the benefit can significantly outweigh that cost." (from here). In this example, the data was "lumpy" causing a single executor to do most of the work.

I've also seen repartitioning on a given field used to optimize joins. See my previous post where repartitiong a Dataframe by field X and writing it to a file was used to make joining on X more efficient.


"join has the same semantics as in sql join, e.g. every row after the join is (k, v1, v2), where v1 is from rdd1, and v2 is from rdd2.

cogroup just groups values of the same key together, and every row looks like (k, seq1, seq2), where seq1 contains all the values having the same key from rdd1."

(From another Google Groups discussion).

Note that Datasets have slightly different semantics. The outer-joins of RDDs can return RDDs that contain Options. Dataset's join returns another Dataset whose rows can be accessed as usual while its @experimental joinWith returns a Dataset[(T, U)]. Note that T and U can be null. No nice Options for us here.


"In Apache Hadoop, the grouping is done in two places - the partitioner, which routes map outputs to reduce tasks, and the grouping comparator, which groups data within a reduce task.  Both of these are pluggable per-job.  The sorting is pluggable by setting the output key comparator...

"The order of the values within a reduce function call, however, is typically unspecified and can vary between runs. Secondary sort is a technique that allows the MapReduce programmer to control the order that the values show up within a reduce function call." (from here). This can then help the next stage.

Sorting also helped us when writing Parquet files to HDFS.

An interesting trick to improve sort performance can be found here: "Sorting in general has good cache hit rate due to the sequential scan access pattern. Sorting a list of pointers, however, has a poor cache hit rate because each comparison operation requires dereferencing two pointers that point to randomly located records in memory. So how do we improve the cache locality of sorting? A very simple approach is to store the sort key of each record side by side with the pointer. For example, if the sort key is a 64-bit integer, then we use 128-bit (64-bit pointer and 64-bit key) to store each record in the pointers array."


You may see the DAG in the GUI not being as linear as you'd expect. It is Spark's prerogative to do this.  "Stages that are not interdependent may be submitted to the cluster for execution in parallel: this maximizes the parallelization capability on the cluster. So if operations in our dataflow can happen simultaneously we will expect to see multiple stages launched" (from here).

Tuesday, September 12, 2017

Neural Networks are just Matrices

DeepLearning4J brings neural nets to Java programmers. They suggest in the getting started section to run the XorExample. This is a neural net that, given XOR inputs and outputs, learns its logic. This is non-trivial for a simple neural net (see here) as the true and false values are not linearly separable in a single XOR matrix. DL4J provides a way of making more complicated neural nets but hides a lot of detail.


The network in XorExample "consists in 2 input-neurons, 1 hidden-layer with 4 hidden-neurons, and 2 output-neurons... the first fires for false, the second fires for true".

But instead of talking about neurons, it's much easier to think of this neural net as matrices (at least if you're familiar with simple linear algebra).

So, imagine this neural net of just:

  • A 4 x 2 matrix of the inputs ("features").
  • A hidden layer that is a 2 x 4 matrix 
  • A layer that is a 4 x 2 matrix that yields the output.

We need to apply functions to these matrices before we multiply, but that's essentially it.

Multi-layer Neural Nets in a few lines of Python

To do this, I'm not going to use TensorFlow or other frameworks dedicated to neural nets. I'll just use Numpy as basically syntactic sugar around manipulating arrays of arrays.

Also, I'll present a more intuitive approach to the maths. A more thorough analysis can be found on Brian Dohansky's blog here.

Finally, I tried to do this in Scala using the Breeze linear algebra library but it was just so much easier to do it in Python and Numpy as it ran in a fraction of the time it took Scala to even compile.

The code

As mentioned, we need Numpy.

import numpy as np

Now, let's take the input to a XOR operation (our first matrix):

features = np.matrix('0.0, 0.0;'
                     '1.0, 0.0;'
                     '0.0, 1.0;'
                     '1.0, 1.0')

and the expected output:

labels = np.matrix('1.0, 0.0;'
                   '0.0, 1.0;'
                   '0.0, 1.0;'
                   '1.0, 0.0')

(Remember that that this is not trying to be a truth table. Instead, the first column indicates that the output is true if it's 1 and and the second column indicates it's false if it's 1).

We need those other 2 matrices. It doesn't matter what their values are initially as we'll correct them whatever they are. So, let's chose some random values but in a well-defined shape:

weightsLayer1 = np.random.rand(2, 4)
weightsLayer2 = np.random.rand(4, 2)

We also need the gradient of the weights and biases. We could have subsumed the biases into our matrices - that's mathematically equivalent - but the DeepLearning4J example doesn't do this so we won't either.  The weights and biases for the first and second layers respectively are:

_0_W = np.zeros([2, 4])
_0_b = np.zeros([1, 4])
_1_W = np.zeros([4, 2])
_1_b = np.zeros([1, 2])

Finally, we need a step value and a batch size:

learning_rate = 0.1
mini_batch_size = np.shape(features)[0]

Now we can do our first multiplication (or forward propagation):

    s0 = features * weightsLayer1
    s0 += _0_b   

But we need to apply a function to this (an activation function). In this example, we'll use a sigmoid function. It has a simple derivative that we'll need later.

def f_sigmoid(X):
        return 1 / (1 + np.exp(-X))

So, now we can apply the sigmoid activation function to the first layer:

sigmoided = f_sigmoid(s0)

This we'll feed into the next layer with another matrix multiplication:

    s1 = sigmoided * weightsLayer2
    s1 += _1_b

No we need our second activation function to apply to this matrix. It's the softmax function that basically normalizes to 1 all the values in each row allowing us to treat each row as a probability distribution. It looks like this (as stolen from Brian):

def f_softmax(X):
    Z = np.sum(np.exp(X), axis=1)
    Z = Z.reshape(Z.shape[0], 1)
    return np.exp(X) / Z

We apply this to the output from the previous layer and find the delta with what the output should be:

softmaxed = f_softmax(s1)
delta = softmaxed - labels

We calculate the delta weighted according to the weights of this layer, Transposing appropriately:

epsilonNext = (weightsLayer2 * delta.T).T

Now, it's a good job that sigmoid function has an easy derivative. It looks like this:

dLdz = np.multiply(sigmoided, (1-sigmoided)) 

where, note, this is an element-wise multiplication, not a matrix multiplication. Similarly, we calculate the back propagation (which "allows the information from the cost to then flow backward through the network in order to compute the gradient" [1]) using the same Numpy operation:

backprop = np.multiply(dLdz, epsilonNext)

Intuitively, you can think of this as each element of the weighted delta only being multiplied by the gradient of the function at that element.

"The term back-propagation is often misunderstood as meaning the whole learning algorithm for multi layer neural networks. Actually, back-propagation refers only to the method for computing the gradient, while another algorithm such as stochastic gradient descent, is used to perform learning using the gradient." [1]
Anyway, we can now update the gradients and the weights:

    _0_W = (features.T * backprop) + _0_W
    _1_W = (sigmoided.T * delta) + _1_W
    _0_b = _0_b - (learning_rate * np.sum(backprop, 0))
    _1_b = _1_b - (learning_rate * np.sum(delta, 0))
    weightsLayer1 = weightsLayer1 - (learning_rate * _0_W)
    weightsLayer2 = weightsLayer2 - (learning_rate * _1_W)

Note that the biases are derived from the columnar sums of the backprop and delta matrices.

Now, repeat this some 500 times and the output looks like:

print softmaxed

[[  1.00000000e+00   3.16080274e-36]
 [  1.50696539e-52   1.00000000e+00]
 [  1.64329041e-30   1.00000000e+00]
 [  1.00000000e+00   8.51240114e-40]]

which for all intents and purposes is the same as labels. QED.

Some things we ignored

For brevity, I didn't address the score (that tells us how close we are to our desired values).

Also, any attempt at regularization (that attempts to avoid over-fitting) was ignored. The DL4J XorExample set the L1 (New York taxi distance) and L2 (the Euclidean distance) to zero so we're true to the original there.

[1] Deep Learning (Goodfellow et al)

Sunday, September 10, 2017

Givens Rotations

The problem

If A is a matrix whose columns are {v1v2v3, ... vn }, then any point in its span can be described with an appropriate choice of x thus:
x = 
where x1, x2 ... etc are the multipliers we want of a given dimension summed over all the vectors, vi.

What this means is that any vector, b, can be expressed as A x. Equivalently, if we solve |b - A x| = 0 we're solving the least squares problem, beloved of loss functions in machine learning. "The advantage of an algorithm for least squares is that it can be used even when the matrix-vector equation has no solution" [1].

One algorithm: Gram Schmidt

The approach used in "Coding the Matrix" is the Gram Schmidt method. The circuitous route goes like this:
  1. We want to find the closest point between a vector that we'll call b and a space defined by the span of vectors {v1v2v3, ... vn}. We will assume that vi is orthogonal to vj for all i ≠ j. If they're not orthogonal, it's not hard to turn them into vectors that are but that's another problem.
  2. Given a k-dimensional vector space, V, the vector b can be expressed as the sum of two vectors - one that is in V and one that is orthogonal to V. In Klein's notation, this says b = b||V + b⊥V. "The point in V closest to b is b||V and the distance is ||b⊥V|| ... it suffices to find b⊥V, for we can then obtain b||V." [1].
  3. Given the orthogonal set of vectors{v1v2v3, ... vn}, b⊥V equals b with all the projections, <b, vi>, shaved off. This creates a recursive algorithm that looks like:

    bi = bi-1 - <bi-1, vi> / <vi, vi>

    where b0 = b. For convenience, we say σi = = bi-1 - <bi-1, vi> / <vi, vi>
  4. Now b can be expressed as

    b = σ0v0 + ... + σk-1vk-1 + b⊥V

    which just like the matrix at the top of this post can be expressed like:
    b = 
Similarly, if {v1v2v3, ... vn} are not orthogonal, we can invoke the same recursive algorithm to generate the original vectors from a linear combination of mutually orthogonal vectors, {v1*, v2*, v3*, ... vn*}. It would look like this:
A = 



QR-factorization is reducing an m x n matix A into two matrices, Q and R where Q is an m x n column orthogonal (ie, the columns are orthogonal and have a unit norm) matrix and R is a triangular matrix [1]. In other words, it looks like the matrix above where R is the upper triangular matrix.

Another algorithm: Givens Rotation

Now, there is another way of achieving the same end. We can rotate the matrix such that a particular cell becomes zero. Keep doing this and we get the upper triangular matrix as above. This is called a Givens rotation. Let's call each rotation Gi then:

R = Gk Gk-1... G1  A

But note that each rotation is of the form:


where c2+s2=1, the c values are on the diagonal and all the other diagonals are 1. Or, to put it more succinctly in Python:

def givensRotation(A, i, j):
    G = np.eye(len(A))
    aij = A[i, j]
    ajj = A[j, j]
    r = ((aij ** 2) + (ajj ** 2)) ** 0.5
    c = ajj / r
    s = - aij / r
    G[j, i] = -s
    G[j, j] = c
    G[i, i] = c
    G[i, j] = s
    return G

Rearranging that equation for R, we get:

A = G1-1 G2-1 ... Gk-1-1 Gk-1  R

but as luck would have it, these G matrices are their own inverses (since c2+s2=1. Try it.). So:

A = G1 G2 ... Gk-1 Gk  R

and all we need to do is keep the original G matrices in the right order and setting

Q = G1 G2 ... Gk-1 Gk

we get:

A = Q R

which is our QR decomposition. Booyah.

Given a triangle represented in Breeze as:

    val m = DenseMatrix.zeros[Double](rows = 3, cols = 3)
    m(0, ::) := DenseVector(1.0, 2.0, 3.0).t
    m(1, ::) := DenseVector(-6.0, 5.0, -4.0).t
    m(2, ::) := DenseVector(-7.0, 1.0, 9.0).t

we can transform it with Givens rotations to give us:
Red is original, blue is reflection

Distributed Givens Rotation using Spark

I tried this in my own toy linear algebra library for Spark, Algernon. Performance currently sucks but as a proof-of-concept it shows it can be done. I avoid Spark's matrix multiplication code for efficiency. Since the G matrix is mostly 1s in the diagonal and this is effectively a NoOp in matrix multiplication for that particular row, we can write:

    val mathOps = implicitly[Numeric[T]]
    import session.sqlContext.implicits._
    import mathOps.mkNumericOps
    ithRow.joinWith(jthRow, '_1 ("j") === '_2 ("j"), "outer").flatMap { case (x, y) =>

      def calculate(x: MatrixCell[T], y: MatrixCell[T], _s: T) =
        {if (x == null) else (x.x * c)} + {if (y == null) else (y.x * _s)}

      val newIth = if (x == null) Seq() else Seq(MatrixCell(x.i, x.j, calculate(x, y, s)))
      val newJth = if (y == null) Seq() else Seq(MatrixCell(y.i, y.j, calculate(x, y, -s)))

      newIth ++ newJth

Here, we're only interested in the i-th and j-th rows, multiplying them with the c and s values we have already calculated.

[1] Coding the Matrix, Klein

Friday, August 25, 2017

More HBase tuning

I've come across two HBase clusters where writes have been extremely fast and reads extremely slow. Note: these reads were not full scans (which would unsurprisingly be slow) but batched get() calls.

This post is about making HBase read things faster. "The rule of thumb is to have your hot data set in RAM. Does not fit? Increase RAM, increase # of servers." (from here).

How much data?

Check how big the HBase table is

hadoop fs -du -h  /hbase/data/default/ 


The more compressed your data, the more likely it is that it will all fit into RAM. Compressing data in the cache may increase CPU usages but reduce IO. You can check which native libraries Hadoop knows about with:

hadoop/bin/hadoop checknative -a

but note: "If the native libs are NOT present, you will see lots of Got brand-new compressor reports in your logs" (from here).

Wait for the cache to fill

It takes time for the cache to be populated. Run vmstat (on Unix-like OSs) and watch the number of blocks read (bi). It will be large to begin with (thousands or tens of thousands) then shrinks after the app has been running for a while, down to basically zero.

You can watch the progress in the region server's logs:

$ grep "BucketCacheStatsExecutor" hbase/logs/hbase-ubuntu-regionserver-ip-172-30-0-139.log
2017-08-23 09:15:28,192 INFO  [BucketCacheStatsExecutor] bucket.BucketCache: failedBlockAdditions=0, totalSize=5.86 GB, freeSize=4.28 GB, usedSize=1.58 GB, cacheSize=1.56 GB, accesses=49844, hits=2231, IOhitsPerSecond=7, IOTimePerHit=0.03, hitRatio=4.48%, cachingAccesses=49844, cachingHits=2231, cachingHitsRatio=4.48%, evictions=0, evicted=0, evictedPerRun=NaN
2017-08-23 09:20:28,192 INFO  [BucketCacheStatsExecutor] bucket.BucketCache: failedBlockAdditions=0, totalSize=5.86 GB, freeSize=837.15 MB, usedSize=5.04 GB, cacheSize=4.97 GB, accesses=150478, hits=33665, IOhitsPerSecond=104, IOTimePerHit=0.03, hitRatio=22.37%, cachingAccesses=150478, cachingHits=33665, cachingHitsRatio=22.37%, evictions=1, evicted=13644, evictedPerRun=13644.0
2017-08-23 09:25:28,192 INFO  [BucketCacheStatsExecutor] bucket.BucketCache: failedBlockAdditions=5, totalSize=5.86 GB, freeSize=552.66 MB, usedSize=5.32 GB, cacheSize=5.25 GB, accesses=299660, hits=95870, IOhitsPerSecond=207, IOTimePerHit=0.03, hitRatio=31.99%, cachingAccesses=299660, cachingHits=95870, cachingHitsRatio=31.99%, evictions=7, evicted=95977, evictedPerRun=13711.0

Types of caches

There are two types of caches: memcache and blockcache. The memcache is write-through cache. the blockcache is for read-only access.

You may want "to reduce the block size of the data stored in disk. Why? When a row is requested by client, the block corresponding to where the row is stored on disk (store file) is read into memory (cache) before sending it back the requested data to the client. So by decreasing the block size more relevant data can be stored in cache which can improve read performance." (from here).

Types of Blocks

From here: "HBase blocks and HDFS blocks are different things:

  • HBase blocks are the unit of indexing (as well as caching and compression) in HBase and allow for fast random access. 
  • HDFS blocks are the unit of the filesystem distribution and data locality"
Types of sizes

From here:

"Generally there are 2 sizes involved:

1. HBase Filesize
2. HBase Blocksize

#1 sets the maximum size of a region before it is split. Default used to be 512mb, it's now 1g (but usually it should be even larger)

#2 is the size of the blocks inside the HFiles. Smaller blocks mean better random access, but larger block indexes. I would only increase that if you have large cells."

Heap or off-Heap?

Making the cache off-heap really improved matters for me:

"When bucket cache is enabled in HBase, it will act as L2 (level 2 similar to processor cache hierarchy) cache and the default block cache will act as L1 cache. What that means is data need to be copied from L2 cache into L1 cache for HBase to service the query request. So on a read request when bucket cache is enabled, data block will be copied from disk onto bucket cache and then onto the block cache before served to the client making the request. On further reads of the same data, it will either be served directly from block cache or copied from bucket cache into block cache before serving the client request avoiding disk reads. So if there is a 10 node cluster with 128 GB of RAM each, it is possible to have almost 1 TB of data stored in HBase cache with bucket cache enabled and not get impacted by JVM GC which is not the case using just the default block cache." (from here).

So, trying assign more heap memory seems like a fool's errand. That will just cause a lot of GC. Instead, use off-heap memory.


I was wondering if convolutional neural network had anything to do with convolution in mathematics. They do. But just as an introduction, here's what convolution in maths is all about.

Laplace transforms

These are useful in solving differential equations. Boaz defines them as:

L(f) = ∫f(t) e-pt dt = F(p)

If you use the key f(t), you can lookup the solution. For instance, if

f(t) =  e-at


F(t) = 1/(a + p)

You can work this out by simple integration, look it up in a table like this:
or use software like Sympy:

from import t, s
from sympy import *
import sympy

a = symbols('a', positive=True)
p = symbols('p')
print laplace_transform(sympy.exp(-a * t), t, p) 

which prints out:

(1/(a + p), 0, True)

Where this gets interesting is when we get recursive. Let f(t) = y and for brevity let y' = dy/dt etc. Let's plug this into the definition of a Laplace transformation at the top of the page:

L(y')y' e-pt dt

by a standard integration by parts (that says ∫u dv = uv - ∫v du) then with u=e-pt and dv=dy:

L(y') = y' e-pt dt = e-pt dy = e-pt y|0 - (-p)y e-pt dt
     = -y(0) + pL(y)

There was no particular reason to chose f(t) = y. In fact, let's chose f(t) = y' and do the same all over again. We get:

L(y'')  = -y'(0) + pL(y')

Taking these two equations, we can cancel out L(y') giving us:

L(y'')  = pL(y) - py(0) - y'(0)

and so on for any number of derivatives of y. We have our table of equations for L(y) so we only need plug that in.


Now, as an example, take the equation:

A y''(t) + B y'(t) + C y(t) = f(t) where y(0) = y'(0) = 0

which is a typical physics equation (where a particle is at rest and the force applied to it start at at t=0). YMMV.

then applying the Laplace transform to all terms:

p2 L(y) + B p L (y) + C L(y) = L(f)


L(y) =      L(f)        =      L(f)
      (p+ B p + C)     A(p + a)(p + b) 

where a and b are chosen to make the constants B and C disappear.

But, as it happens, this factor is also a Laplace transform. From Boaz' table:

Let's call it T(p).

So, now we have:

L(y) = T(p) L(f)

and we can conclude that y "is the inverse transform of two functions whose inverse we know".

Ok, that's an example. Let's see the general case. Let G(p) and H(p) be transforms of g(t) and h(t) respectively. Then:

G(p) H(p) = g(σ) e-pσ dσ h(τ) e-pτ 

from the definition of a Laplace transformation (where we are using σ and τ to avoid confusion with a duplicated t).

Let's do another change of variables and have σ=t-τ for fixed τ which means that dσ=dt but the limits slightly change. So:

G(p) H(p) = τg(t-τ) e-p(t-τ)dt h(τ) e-pτ 
          = τ e-pt g(t-τ) h(τ) dτ dt

Since τ ranges from 0, that limit changes and this equation becomes:

G(p) H(p) = L (0 g(t-τh(τ) dτ ) = L ( g * h )

Note that τ introduces a sliding window that will be used in convolutional neural nets (part 2).

Wednesday, August 9, 2017

3D Plots

I'm creating a Spark linear algebra library but wanted to plot a few graphs to get an idea of what's going on. These are the tools I tried.


Sympy looked good as it allowed implicit functions. For instance, you can write:

from sympy import symbols
from sympy.plotting import plot3d

x, y = symbols('x y')

p = plot3d((x/2) + y, (x, -5, 5), (y, -5, 5))

and get the plane that represents z = (x/2) + y.

The problem was that "SymPy uses Matplotlib  behind the scenes to draw the graphs" [1] and Matplotlib has some issues in the z-order while rendering.

So, although SymPy gives you a lot more than just plotting, I moved on to...


... which renders things beautifully.

Until I tried to do something clever.

First, I had to update pip. Then, with, I tried:

from mayavi import mlab
mlab.options.backend = 'envisage'

which gave me:

ImportError: No module named envisage.ui.workbench.api

And after some googling, it looked like I needed to mess around with all sorts of libraries so I was a coward and gave up quickly.


It turns out that there is a perfectly good Java library called Jzy3d which is quite mature.


v1 = [8, -2, 2]
v2 = [4,  2, 4]

I wanted to plot these vectors, the surface on which they lie and the shortest distance between this plane and [5, -5, 2] (this exercise can be found in Philip Klein's excellent "Coding the Matrix").

First, I had to convert this information to something Jzy3d can handle - which looks like this:

        Mapper plane = new Mapper() {
            public double f(double x, double y) {
                return (x + (2 * y)) / 2;

that is, we first need to turn these vectors into an implicit function. Well, any point (x, y, z) on the plane can be described as s v1 + t v2 where s and t are any real numbers. Turning this information into parametric equations is easy:

x = 8s  + 4t
y = -2s + 2t
z = 2s  + 4t

and solving these simultaneous equations gives us the implicit function:

x + 2y - 2z = 0


z = (x + (2 * y)) / 2

which is the equation we find in our Java code above.

Finding the normal to an implicit function is simply the coefficients, that is [1, 2, -2]. At the point the normal meets [5, -5, 2] (which we'll call b) is the point where u b =s v1 + t v2. Once again, convert to parametric equations and then solve the simultaneous equations and you'll find t=-0.5 and s=1. This will give the point [6, -3, 0] and Jzy3d renders it like this:

where the red line is the normal, the green line is the point and the blue line is the closest point on the surface.

The rendering can be a bit jagged and I need to look at adding arrows to the vectors but so far, this seems to be the easiest plotting library to use.

[1] Doing Math with Python (No Starch Press)

Monday, July 10, 2017

Building Big Data Apps

After spending the last 18 months using Spark to write an entity resolution software for over a terrabyte of data, here are some miscellaneous notes of what I wish I'd known from the start. In no particular order:

1. Make sure your app can recover from a failure easily. Write to HDFS after each major stage. This will also help debugging when the answer that comes out of the sausage machine wasn't entirely what you were expecting. ("Simply split long-running jobs into batches and write intermediate results to disk. This way, you have a fresh environment for every batch and don’t have to worry about metadata build-up" from here). Also, the topology for one stage may not be appropriate for another (see here for an example where smaller numbers of executors with more resources - contrary to the general Spark advice - gives better performance).

2. Small inefficiencies can cause big problems. Use jstack liberally.

3. Don't use Spark as a key/value lookup. That's not what it's built for. Use another system. Don't try to hack it by using a broadcast variable as that simply doesn't scale.

4. Use realistic data, both in size and quality. Making fake data is surprisingly hard if you want the output to remotely correspond to the real world.

5. Have an automated test in a realistic environment (you don't want authentication problems to show up late, for example). Run the app daily in this environment to show any performance changes

6. "Don't start modeling before designing some measurable goals" [1]. Define acceptance criteria early on as to what you'd expect to come out and what you wouldn't. Estimate the false positive/negative rate. For example, at one point we expected 220k of company entities to resolve with Orbis data. Using a very simple query, we were seeing about 130k of our businesses resolve to something. Therefore, the true positive rate could not be higher than about 60% (and may have been less) therefore there was work to do here.

7. Pass small objects around, preferably just IDs. This is what the built-in Connected Component algorithm does. It will improve performance.

The stages of my app

There were 6 stages to my application:

1. Build a matrix using TF-IDF to assign weights to words.
2. Calculate the cosine similarities.
3. Execute bespoke business rules.
4. Find connected components.
5. Turn those IDs back into entities.
6. Consolidate the relationships between these resolved entities in a process called Edge Contraction.

There were some interesting modifications to this basic flow.

1. Feature hashing improved run time performance but the largest connected component went from 600 to 26 000 (BlackRock/Merrill Lynch who seem to have created what appears to be a lot of shell companies with similar names).

2. By ignoring all words that appear in over 1000 documents, there was no need for stop words. This was useful since the corpus was multilingual.

A note on requirements gathering

One tip in finding which database suits you come from the late Dr Jim Gray. "Gray's recipe for designing a database for a given discipline is that it must be able to answer the key 20 questions that the scientist wants to ask of it." [2] A real-world example can be found here. The idea being that 20 questions is the roughly the minimum number of questions you need before a pattern emerges.

This teased out of the business that we needed more than just a graph database (like Neo4J) which they seem for some reason to have fixated on at one point. We also needed the batch processing that GraphX gave us.

[1] Practical Data Science with R.
[2] The Fourth Paradigm: Data-Intensive Scientific Discovery, Tony Hey.

Parquet Flaw

This Spark/Parquet abuse bit us hard this week. "In a partitioned table, data are usually stored in different directories, with partitioning column values encoded in the path of each partition directory" (from here). This is great for compression as values do not even appear in the file as they are encoded in the directory structure of a Parquet file.

Unfortunately, if you save the Dataset to HDFS, it appears that a new file on HDFS is created for each contiguous block that belongs to the same Parquet directory. For example, if there were only two values for a key, 1 and 2, and the Dataset would map elements to partitions like this:


then this piece of the sequence would result in 5 different files having keys [1,1], [2], [1], [2,2,2] and [1,1,1].

So, instead of fewer but larger files, we had many smaller files. This is a scenario that HDFS does not handle very well and it manifested its displeasure by a DDOS on the Name Node.

What's more, resolving the problem by deleting them also caused the Name Node pain. Using the -skipTrash flag made things a little better.

The solution was to sort the Dataset before saving.

Tuesday, July 4, 2017

Functional Foibles

Functional languages are supposed to protect you from nasties like NullPointerException etc but there are some gotchas.

      a[NoSuchElementException] should be thrownBy {

Curiously, Haskell does the same.

$ ghci
GHCi, version 7.10.3:  :? for help
Prelude> head [] :: [Int]
*** Exception: Prelude.head: empty list

But this little annoyance occurred in a Spark job (here illustrated with ScalaTest):

      an[UnsupportedOperationException] should be thrownBy {

A neat mitigation can be found here and looks like this:

      List.empty[Int].reduceOption(Math.max) shouldBe None

An interesting point was made here on how things should be:
I would suggest that max is not an option. If you have a type (empty[T]), then this fold (I mean max) should return the neutral element regarding max(x,y), that is, the minimal element of the (partial) order of T, if one exists; if it does not, then it makes sense to throw an exception. 
Same as with other monoidal operations, e.g. empty.sum is a zero, not an exception. 
This would be the right mathematical answer.
As a reminder, partial ordering exhibits anti-symmetry, transitivity and reflexivity, that is x ≤ x (contrast this with total ordering which exhibits anti-symmetry, transitivity and totality, that is x ≤ y or y ≤ x).

So, what's being said here is that the minimal element for, say, a Double is Double.MinValue. This is indeed transitive (obviously), reflexive (clearly MinValue  MinValue) and anti-symmetric (if MinValue  x and  MinValue then the only conclusion is x = MinValue).

Compare this to Double.NaN which is not reflexive and therefore a partial ordering does not exist.

So, to be mathematically correct, List.empty[Int].max should return Double.MinValue.

"Head is a different story; here we have a semigroup, not a monoid. There's no neutral element."

And so it is:

      List.empty[Int].sum shouldBe 0

Spark Dataframes and Datasets

Although RDDs are conceptually simple, all new optimizations are coming from DataFrames and Datasets. As I upgrade my software from RDDs, these are the notes I've made.


Apache Parquet is a columnar storage format. It can sometimes lead to improved performance, particularly with large data sets. By using a schema and by Parquet storing each column in its own file, queries over 400mb of data can take just one second.

You can covert CSV to Parquet with something like:

  .option("header", "true")
  .option("inferSchema", "true")


We can now read this in with something like:

val df =
df.withColumn("cus_id_no", $"cus_id_no".cast("bigint"))
  .na.fill(0) // fill null numeric
  .na.fill("")// fill null string

Here, we're also filtering and providing defaults.


DataSets are type safe. They can be derived from DataFrames with something like this:

headerDS = headerDF.withColumn("COLUMN", UDF).withColumn( .... ).as[MyCaseClass]

and manipulated with something like this:

val ds        =[CaseClass]
val groupedBy = ds.groupByKey(_.x)
val joined    = ds.joinWith(groupedBy, ds("cus_id_no") === groupedBy("_1"), "left_outer")

DataFrames are just of type Dataset[Row] - see the type alias in the package object of org.apache.spark.sql:

type DataFrame = Dataset[Row]


"By avoiding the memory and GC overhead of regular Java objects, Tungsten is able to process larger data sets than the same hand-written aggregations" (from here).

Among Tungsten's clever features, it:

1. does not deserialize the whole object (very useful in a joinBy etc)

2. manages objects off-heap (see here).


"At the core of Spark SQL is the Catalyst optimizer, which leverages advanced programming language features (e.g. Scala’s pattern matching and quasiquotes) in a novel way to build an extensible query optimizer." from here.

The bad news

With these APIs, you have to accept a much less rich interface. For instance, groupByKey returns a KeyValueGroupedDataset which has a limited set of functions (for instance, there is no filter and mapping to any type that doesn't have an org.apache.spark.sql.Encoder associated with it leads to a compile-time error of "... Support for serializing other types will be added in future releases")

You can always convert them to RDDs but then you lose all optimization benefits.

The difference

This takes 1.2 hours on Orbis data: => x.BVDIDnumber -> Seq(x)).reduceByKey(_++_).filter(_._2.size>1).take(10)

whereas this takes 5 mins:

headerDF.groupByKey(_.getString(0)).flatMapGroups { case (key, iter) => val ys = iter.toSeq ; if (ys.size >1) Seq(", "))) else Seq.empty }.take(10)

Aside: this idiom reflects a monadic principle.

The filter method is completely described in one simple law:
FIL1. m filter p ≡ m flatMap {x => if(p(x)) unit(x) else mzero}

(from here).


If we want to optimize a join, we might want to re-partion the DataFrame with something like this so that all joins will take place in one partition.


Now, if we want to do a join, it looks a little like this:

aDataFrame.join(other, other("record_id") <=> $"record".getItem("record_id"))


 |-- document_type: string (nullable = true)
 |-- record_id: string (nullable = true)
 |-- entity_id: string (nullable = true)
 |-- entitySize: integer (nullable = true)


 |-- record: struct (nullable = true)
 |    |-- document_type: string (nullable = true)
 |    |-- record_id: string (nullable = true)

Sunday, June 18, 2017


In my physics degree, we were asked to calculate the entropy of a chess board. Students smarter than me snorted at this silly exercise. Yet, entropy is just a statistical measure. You cannot measure it directly (there are no entropy-ometers) but it exists everywhere and can be used in odd places like machine learning (eg maximum entropy classifiers which "from all the models that fit our training data, selects the one which has the largest entropy").

Alternatively, you might want to find the configuration with the smallest entropy. An example is here where the quality of a clustering algorithm (k-means in this case) is assessed by looking at the entropy of the detected clusters. "As an external criteria, entropy uses external information — class labels in this case. Indeed, entropy measures the purity of the clusters with respect to the given class labels. Thus, if every cluster consists of objects with only a single class label, the entropy is 0. However, as the class labels of objects in a cluster become more varied, the entropy value increases."

For instance, say you are trying to find the parameters for an equation such that it best fits the data. "At the very least, you need to provide ... a score for each candidate parameter it tries. This score assignment is commonly called a cost function. The higher the cost, the worse the model parameter will be... The cost function is derived from the principle of maximum entropy." [1]

What is Entropy

I found this description of heads (H) and tails (T) from tossing a coin enlightening:

"If the frequency [f] of H is 0.5 and f(T) is 0.5, the entropy E, in bits per toss, is

-0.5 log2 0.5

for heads, and a similar value for tails. The values add up (in this case) to 1.0. The intuitive meaning of 1.0 (the Shannon entropy) is that a single coin toss conveys 1.0 bit of information.

Contrast this with the situation that prevails when using a "weighted" or unfair penny that lands heads-up 70% of the time. We know intuitively that tossing such a coin will produce less information because we can predict the outcome (heads), to a degree. Something that's predictable is uninformative. Shannon's equation gives

-0.7 log2 (0.7) = 0.3602

for heads and

-0.3 log2  (0.3) = 0.5211

for tails, for an entropy of 0.8813 bits per toss. In this case we can say that a toss is 11.87% [1.0 - 0.8813] redundant."

Here's another example:

X = 'a' with probability 0.5
    'b' with probability 0.25
    'c' with probability 0.125
    'd' with probability 0.125

The entropy of this configuration is:

H(X) = -0.5 log(0.5) - 0.25 log(0.25) - 0.125 log(0.125) - 0.125 log(0.125) = 1.75 bits

What does this actually mean? Well, if the average number of questions asked ("Is X 'a'? If not, is it 'b'? ...") then "the resulting expected number of binary questions required is 1.75" [2].

Derivation of entropy

Basically, we want entropy to be extensive. That is "parameters that scale with the system. In other words U(aS,aV,aN)=aU(S,V,N)".

So, if SX is the entropy of system X, then the combined entropy of two systems, A and B, would be:

SC = SA + SB

Second, we want it to be largest when all the states are equally probably. Let's call the function f then the average value is:

S = <f> = Σipif(pi)               Equation 1

Now, given two sub-systems, A and B, the system they make up C will have entropy:

SC = ΣiΣjpipjf(pi)f(pj)            Equation 2

that is, the we are summing probabilities over the states where A is in state i and B is in state j.

For equation 2 to conform to the form of equation 1, let's introduce the variable pij=pipj.Then:

SC = ΣiΣjpijf(pij)

For this to be true, f = C ln p since ln(ab) = ln(a) + ln(b).

This is the argument found here.

[1] Machine Learning with Tensor Flow.
[2] Elements of Information Theory.