Thursday, August 20, 2015

Spark Shuffle

... is something you should avoid.  "All shuffle data must be written to disk and then transferred over the network" (from here). This can be very expensive on a large data set. If the data is evenly distributed but a query asks for it to be grouped according to an arbitrary criteria, much movement will ensue. Therefore, groupByKey should be avoided if possible as the ScalaDocs indicate this can be an expensive operation.

However, reduceByKey is much more efficient as it will "perform the merging locally on each mapper before sending results to a reducer" resulting in a much smaller transfer of data between nodes.

So, when we were writing a risk engine with Spark holding all our user-defined Record objects, our code would look something like:

def totalByKey(data: RDD[Record]): RDD[(String, Double)] = { => record.key -> record.value).reduceByKey(_ + _)

which would first turn our Records into key/value pairs then add up all the values for each key without transferring large amounts of data over the network. It avoids this by adding all the values for Records on each node before adding all these subtotals. Only the addition of subtotals requires a network call.

Interestingly, Spark can then do an outer join like this:

val subtotalForDate1 = totalByKey(dataForDate1)
val subtotalForDate2 = totalByKey(dataForDate2)
val outerJoing       = subtotalForDate1.fullOuterJoin(subtotalForDate2)

which returns an RDD of tuples containing the key, the left and the right value. Using this join, we can compare our risk profile from one date to the next.

Note that both reduceByKey and fullOuterJoin don't actually live in the RDD class but the PairRDDFunctions class. They appear to be part of the RDD class by the magic of implicits when the RDD in question contains tuples of pairs.

One last thing to remember is that if you're trying to be clever and reduce collections of elements, you're no better off than using groupByKey. The function reduceByKey is only useful if you are reducing something like a number. If you look at the code for groupByKey you'll see that it's basically doing just that: reducing collections.

Further Reading

Nice article on memory usage.

Tuesday, August 18, 2015

Try Monads

Martin Odersky in the Reactive Programming course mentions that Try types are not true monads as they violate the Left Unit rule. But if you try (no pun intended) it, Try does indeed appear to obey the rules for monads.

Just to recap, here are the monad rules expressed in Scala:

  def testMonadicPropertiesOfTry[T, U](f: T => Try[U], g: U => Try[U], m: Try[T], x: T, unit: T => Try[T]): Boolean = {

    // Associativity: (m flatMap f) flatMap g == m flatMap (x => f(x) flatMap g)
    def associativity: Boolean = {
      val associativityLhs = (m flatMap f) flatMap g
      val associativityRhs = m flatMap (x => f(x) flatMap g)
      assertEqual(associativityLhs, associativityRhs)
    val associativityResult = Try(associativity)

    // Left unit: unit(x) flatMap f == f(x)
    def leftUnit: Boolean = {
      val leftUnitLhs = unit(x) flatMap f
      val leftUnitRhs = f(x) 
      assertEqual(leftUnitLhs, leftUnitRhs)
    val leftUnitResult = Try(leftUnit)

    // Right unit: m flatMap unit == m
    def rightUnit: Boolean = {
      val rightUnitLhs = m flatMap unit
      assertEqual(rightUnitLhs, m)
    val rightUnitResult = Try(rightUnit)

    (associativityResult, leftUnitResult, rightUnitResult) match {
      case (Success(_), Success(_), Success(_)) => true
      case _ => false

Where my assertEqual method looks like:

  def assertEqual[T](try1: Try[T], try2: Try[T]): Boolean = {
    try1 match {
      case Success(v1) => try2 match {
        case Success(v2) => v1 == v2
        case _ => false
      case Failure(x1) => try2 match {
        case Failure(x2) => x1.toString == x2.toString
        case _ => false

That is, it will compare Failures by looking at the text of their messages. This is because Java exceptions don't have an equals() method.

Now, if we run the code below (borrowed liberally from here) where we're deliberately trying to cause a java.lang.NumberFormatException as the code attempts to convert our 'a' into a numeric:

    def factory[T](x: => T): Try[T] = Try(x)

    def unit[T](x: T): Try[T]   = factory(x)
    def f(x: String): Try[Int]  = factory(x.toInt)
    def g(x: Int): Try[Int]     = factory(x + 1)
    val x                       = "a"
    val m                       = factory(x)

    val isMonadic = testMonadicPropertiesOfTry(f, g, m, x, unit[String])
    println("is monadic? " + isMonadic)

the output says it's true, Try[T] obeys the rules for a monad. What gives?

Mauricio Linhares says: "there is some debate as to if Try[U] is a full monad or not. The problem is that if you think unit(x) is Success(x), then exceptions would be raised when you try to execute the left unit law since flatMap will correctly wrap an exception but the f(x) might not be able to do it. Still, if you assume that the correct unit is Try.apply then this would not be an issue."

So, let's take the first line of the last code snippet and make it thus:

    def factory[T](x: => T): Try[T] = Success(x)

whereupon we are told that Success is not a monad at all. Further investigation reveals that in the leftUnit method:

      val leftUnitLhs = unit(x) flatMap f

works fine but:

      val leftUnitRhs = f(x) 

blows up. The left hand side does not equal the right.

The reason for this is that Success.flatMap catches any NonFatal exceptions just like the constructor of Try. But the constructor of Success does not. And it's this asymmetry that means Try acts like a monad and Success does not.

Further reading

An interesting debate about monads and exceptions here.

Monday, August 10, 2015

The Mathematics of Linear Regression

There's an excellent and free book online called The Elements of Statistical Learning that can be found here. I was browsing through it but hit one of those points where a derivation misses some steps and confuses the reader. It took me a few hours to work it out and it's not too hard but just in case somebody it's the same problem, here is a step-by-step derivation.

The section was on Linear Models and Least Squares (2.3.1) that says:
Ŷ β̂0   p 

j = 1


Where β̂is a constant called the bias, β̂ is a vector of coefficients, X is a vector of inputs and Ŷ is the prediction.

To make things more mathematically convenient, let's subsume β̂0 and the β̂ vector of coefficients into one big vector such that the above equation becomes:

Ŷ = XTβ̂

This is exactly the same equation in matrix notation rather than summation notation (XT is simply the transpose of X).

So, the question becomes what's the optimal value of β̂ ?

Let's re-express this thus: say that we have N trials each with a result yi (this y has nothing to do with Ŷ). All these results can be put in a vector, y. Let's say the matrix X has N rows each representing p data points that are properties of the system (that is, it's an N x p matrix).

Then the error can be expressed as:

y - X ß

(that is, the output values minus all of the input values that are multiplied by our coefficients). This is what we want to minimize.

How do we do this? Well an obvious way is to find the smallest sum of squares of the differences. So, let's square it:

(y - X ß)T (y - X ß)

then expand it noting that matrices are distributive, associative but not commutative and:

(A + B)T = AT + BT              (1)


(AB)T = BT AT                   (2)


= (yT - ßT XT ) (y - X ß)
= yTy - yT X ß ßT XT y + ßT XT X ß

and differentiate it with respect to ß. Where the result of this differentation equals 0 is our optimal value for ß.

The first term is easy to differentiate. There is no dependence of y on ß so it disappears and our equation becomes.

 δ (- yT X ß ßT XT y + ßT XT X ß) = 0     (3)

The next bit is tricky. To help, let's take a look at this useful summary on matrix calculus. The interesting sections for us are Proposition 7 (equations 36 and 37) and Proposition 9 (equation 49). They say:


α = yT A x


δαyT A              (4)


δα = xT AT             (5)

where A is independent of and y.

And if:

α = xT A x


δα = 2 xT A            (6)

where A is symmetric.

Now, take the first term of (3). That's basically equation (4) if we replace A for X and x for ß.

Take the second term of (3). That's basically equation (5) if we replace A for XT and y for ß and x for y.

Finally, take the third term of (3) and that looks like (6) if we replace A for XX and x for ß. How do we know that XX is symmetric? Well any matrix multiplied by its transpose is symmetric (see here for the simple proof).

So, taking these three observations, equation (3) becomes:

0 = - yT - yT X + 2 ßXX
  ßX- 2 yT X 
  = (ßXT - yTX 

Now take the transpose of both sides (the transpose of 0 is 0) and using (1) and (2):

0 = XT (ßXT - yT)T
  = XT ((ßXT)T- (yT)T)
  = XT (ß y)
  = XT ß XT y


XT ß = XT y

and multiplying both sides with (XT X)-1 gives:

ß = (XT X)-1 XT y

which is equation 2.6 in The Elements of Statistical Learning

Wednesday, August 5, 2015

K-Means Clustering

K-Means is a clustering algorithm that tells you were the centres are in a set of data points. It "has the major disadvantage that you must pick k [the number of centres] in advance. On the plus side, it’s easy to implement". A simple implementation is just a few lines of Scala although it can be used on a much larger scale in MLLib/Spark.

First, let's create some data - just rows of x,y such that when plotted in R it looks like this:

data <- read.csv("/tmp/point.csv", header=F)
matplot(data[,1],data[,2] ,type="p", pch=3, xlab="x", ylab="y")

It's clear where to the human eye that there are 2 clusters here and where their centres are, although it would be much harder just given the raw data.

The snippet of Scala code to calculate K-Means looks something like this:

case class Distance(point: Point2D, distance: Double)
case class Point2D(x: Double, y: Double)
  implicit val pointOrdering = new Ordering[Distance] {
    override def compare(x: Distance, y: Distance): Int = (x.distance - y.distance).toInt

  def iterate(centres: Seq[Point2D], others: Seq[Point2D]): Seq[Point2D] = {
    val clusterPairs = => (centre, Seq[Point2D]()))
    val clusters     = Map(clusterPairs: _*) 
    val assigned     = assign(others, centres, clusters) => update(centrePoint._2 :+ centrePoint._1))[collection.immutable.Seq]
  def update(points: Seq[Point2D]): Point2D = {
    val distances = => sumOfDistances(point, points))
  def sumOfDistances(point: Point2D, others: Seq[Point2D]): Distance = {
    val distances = => distanceFn(other, point))
    Distance(point, distances.sum)
  def distancesFrom(centre: Point2D, others: Seq[Point2D]): Seq[Distance] = => Distance(point, distanceFn(centre, point)))

Where we call iterate an arbitrary number of times until we're happy we have the centres. The iterate method is initially called with k randomly chosen points that will act as our initial guesses at where the centres are. With each iteration, out guess will be refined.

For each iteration, we do two things. First we assign each point to one of our k estimated centres. Then, within each grouping, we find the point that is the most central. These points will be the basis for our next iteration.

We define the central points within a grouping as those that have the minimum value of adding up all the distances to the other points.

The interesting thing is that the resulting cluster points may change somewhat between runs as the initial choice of points may make a difference. (that is, it's unstable) Therefore, it might be necessary to run it a few times and see the most common outcome.

We can verify the results in R with:

> kmeans(data, 2) # 2 being the number of clusters we're expecting
K-means clustering with 2 clusters of sizes 33, 33

Cluster means:
  V1 V2
1 30 30
2 10 10

Which is the same result as my code when run with 10 iterations and taking the most popular results from 10 runs.