Saturday, October 22, 2016

State monads

State is generally an anathema to functional programmers but necessary to the rest of us. So, how are these forces reconciled? Perhaps not surprisingly it requires monads.

"We'll say that a stateful computation is a function that takes some state and returns a value along with some new state." (from LYAHFGG). So, if you had a random number generator (RNG), for each call you'd receive not just a random number but the generator with a new state for the next call. Calling the same generator repeatedly will give you the same value which makes testing easier.

Scalaz gives you a state out of the box. As an example, let's say we're modelling a stack data structure. The pop would look like this:

import scalaz._
import Scalaz._

    type Stack = List[Int]

    val pop = State[Stack, Int] { // (f: S => (S, A)) where S is Stack and A is Int
      case x::xs =>
        (xs, x)

Note that pop is a val not a def. Like the RNG mentioned earlier, it returns the popped value and the new state. We'll ignore error conditions like popping an empty stack for now.

Also note "the important thing to note is that unlike the general monads we've seen, State specifically wraps functions." (Learning Scalaz)

Let's now define what a push is:

    def push(a: Int) = State[Stack, Unit] {
      case xs =>
        println(s"Pushing $a")
        (a :: xs, ())

Once more, the function returns the new state and a value - only this time the value isn't actually defined. What value can a push to a stack return that you didn't have in the first place?

Because State is a monad, "the powerful part is the fact that we can monadically chain each operations using for syntax without manually passing around the Stack values". That is, we access the contents of State via map and flatMap.

So, let's say we want to pop the value at the top of the stack then push two values onto it, 3 and 4.

    val popx1Push2: State[Stack, Int] = for {
      a <- pop
      _ <- push(3)
      _ <- push(4)
    } yield a 

which does absolutely nothing. What is to be popped? Onto what do we push 3 and 4? We need to run it and then map (or foreach or whatever) over the result to access the values. "Essentially, the reader monad lets us pretend the value is already there." (from Learning Scalaz), 1, 2)).foreach { case (state, popped) =>
      println(s"New state = $state, popped value = $popped") // New state = List(4, 3, 1, 2), popped value = 0

But we don't need Scalaz. We can roll our own state (with code liberally stolen from here).

case class State [S, +A](runS: S => (A, S)) {

  def map[B](f: A => B) =
    State [S, B]( s => {
      val (a, s1) = runS(s)
      (f(a), s1)
  def flatMap[B](f: A => State [S, B]) =
    State [S, B]( s => {
      val (a, s1) = runS(s)

Now it becomes clearer what State actually represents. It is a monad (see map and flatMap) that contains a function that given one state, S, can give you the next plus an accompanying value, A.

So, manipulating the state for our stack example looks like this:

  def getState[S]: State [S, S] =
    State (s => (s, s))
where the function says: given a state, return that state.

  def setState[S](s: S): State [S, Unit] =
    State (_ => ((), s))

where the function says: I don't care which state you give me, the next one is s. Oh, and I don't care about the accompanying value.

  def pureState[S, A](a: A): State[S, A] =
    State(s => (a , s))

where the function says the next state is the last state and the accompanying value is what I was given. Pure in this sense refers to the construction of a state, sometimes called unit, and called point in Scalaz.

Now, we do a stateful computation. It's very simple, it just maps over the state monads to generate a unit of work that adds 1 to whatever is given.

    val add1: State[Int, Unit] = for {
      n   <- getState
      b   <- setState(n + 1)
    } yield (b)

    println(add1.runS(7)) // "8"

This is trivial but you can do more complicated operations like:

  def zipWithIndex[A](as: List [A]): List [(Int , A)] =
    as.foldLeft (
      pureState[Int, List[(Int , A)]](List())
    )((acc, a) => for {
      xs  <- acc
      n   <- getState
      _   <- setState(n + 1)
    } yield (n , a) :: xs ).runS(0)._1.reverse

Yoiks! What's this doing? Well, reading it de-sugared and pretending we've given it an array of Chars may make it easier:

    type IndexedChars = List[(Int, Char)]

    val accumulated: State[Int, IndexedChars] = as.foldLeft(
      pureState(List[(Int, Char)]())      // "the next state is the last state and the accompanying value is an empty list"
    )((acc: State[Int, IndexedChars], a: Char) => {
      acc.flatMap { xs: IndexedChars =>   // xs is the accompanying value that the accumulator wraps
        getState.flatMap { n: Int =>      // "Given a state, return that state." (n is fed in via runS)
          setState(n + 1).map(ignored =>  // "I don't care which state you give me, the next one is n+1"
            ((n, a) +: xs)                // the accompanying value is now the old plus the (n,a) tuple.

Which is a bit more understandable.

First steps into Scalaz

Scalaz adds some nice features to Scala.

In the Haskell world, "functions are applicative functors. They allow us to operate on the eventual results of functions as if we already had their results" (from LYAHFGG).

(Where the "Functor typeclass, ... is basically for things that can be mapped over", "all applicatives are functors" [1] and applicatives have a function that has a type of f (a -> b) -> f a -> f b.)

So, why can't we map over functions in Scala? Well, now with Scalaz, we can!

import scalaz._
import Scalaz._

    val f: Int => Int = _ * 2
    val g: Int => Int = _ + 10

    val mapped =
    println(mapped(7))        // "24", ie (7 * 2) + 10

which is the same as saying f.andThen(g) and so the opposite of saying f.compose(g).

We can also flatMap over functions now and as a result can use for comprehensions:

    val addStuff: Int => Int = for {
      a <- f
      b <- g
    } yield (a + b)

    println("addStuff = " + addStuff(7)) // "31"

Why this monadic behaviour is so nice is that "the syntactic sugar of the for-comprehensions abstracts the details nicely enough for the user, who is completely oblivious of the underlying machinery of binding monads" (from here).

We can even raise a plain old Int to the level of monad with:

println(3.point[Id].map(_ + 1)) // prints out "4"

By using point, the "constructor is abstracted out... Scalaz likes the name point instead of pure, and it seems like it’s basically a constructor that takes value A and returns F[A]." (from Learning Scalaz).

[1] Functional Programming in Scala (Chiusano and Bjarnason)

Friday, October 21, 2016

A Spark Quick Win

If you're having trouble making Spark actually work, you're not alone. I can sympathize with the author of these gotchas. Something killing me for weeks was how slow a Spark job was - tens of hours. Further investigation showed lots of GC - something like a third of the run time. Isn't Spark supposed to handle huge volumes of data?

This one tip proved to be a quick-win:


This meant that the data held in memory was serialized. As a result, the memory footprint was 80% less (we use Kryo). In itself, that's cool. But the great thing was that all our data now fitted into the memory of the cluster.

Sure, Spark can process more memory than is physically available but if it's swapping memory in and out from disk, that's not only slow but it's going to cause a lot of garbage collection.

With this setting, full garbage collections took only 30s for hours of processing.

The downside is that obviously this requires more CPU. But not much more and a lot less than a JVM that's constantly garbage collecting.

Saturday, October 8, 2016

Maths and the Principle of Least Surprise

Well, it surprised me.

While Java's maximum and minimum values of Integers really do represent the highest and lowest int values, the same is not true of Doubles. Here, the maximum and minimum values represent the largest and smallest values of the magnitude of the numbers. That is, both values are positive even though Doubles are signed. The nomenclature is confusing but it makes sense (see here). Unlike Integers, Doubles are symmetric so the smallest number you can represent is -Double.MAX_VALUE.

Just when you get used to that, you find that Scala does it differently: Double.MinValue really is the smallest value a double can hold. That is,

Double.MinValue == -java.lang.Double.MAX_VALUE

It's debatable which is the most surprising.

Friday, October 7, 2016

HBase Troubleshooting

Nasty little problem. Connecting to HBase kept timing out after lots of errors that looked like:

client.RpcRetryingCaller: Call exception, tries=17, retries=35, retryTime=208879ms, msg row '' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=HOST,60020,1475135661112, seqNum=0

while logging in the HBase shell seemed to be OK.

HBase uses Zookeeper to keep a record of its regions. So, first I tried checking that:

> echo ruok | nc ZOOKEEPER_HOST 2181

Running a repair didn't seem to help either.

Time to look at the HBase transaction logs. They kept repeating every second or so something like:

master.splitLogManager: total tasks =3 unassigned = 0 tasks={ /hbase-secure/splitWAL/WALs/HOST,600201474532814834-splitting ... status = in_progess ...

WALs are write ahead logs. "HBase data updates are stored in a place in memory called memstore for fast write. In the event of a region server failure, the contents of the memstore are lost because they have not been saved to disk yet. To prevent data loss in such a scenario, the updates are persisted in a WAL file before they are stored in the memstore".

These WALs appeared to be constantly splitting. So, a quick look at the WALs directory  was in order. It looked something like this:

drwr-x-r-x - hbase app-hdfs     0 2016-07-29 /apps/hbase/xxx/WALs/MACHINE_NAME,60020,1474532814834-splitting
drwr-x-r-x - hbase app-hdfs     0 2016-09-29 /apps/hbase/xxx/WALs/MACHINE_NAME,60020,1474532435621
drwr-x-r-x - hbase app-hdfs     0 2016-09-30 /apps/hbase/xxx/WALs/MACHINE_NAME,60020,1474532365837
drwr-x-r-x - hbase app-hdfs     0 2016-07-28 /apps/hbase/xxx/WALs/MACHINE_NAME,60020,1474532463823-splitting

Splitting is natural but shouldn't take too long (a split file from months ago is definitely a bad sign).

We were lucky that this was a test environment so we could happily delete these splitting files and restart HBase (YMMV. Think long and hard about doing that in a production environment...) But the problem went away for us.

Saturday, September 24, 2016

Probabilistic Programming - Part 1

Probabilistic programming is an exciting, new-ish area that is eating a lot of my spare time.

"Probabilistic Programming and Bayesian Methods for Hackers" is an excellent book I am playing with. Not only is it free (see here), you can download it and read it as an IPython book (with the command ipython notebook) and tinker with the equations (use shift-enter to execute them).

As you might have gathered, it's Python-based but there are moves to integrate the library it uses extensively (PyMC) with Spark.

In the Java/Scala world, Figaro is making in-roads. In this first part post, I'm making notes on the Python implementation. In part two, I'm hoping to implement a Scala/Figaro version.

Bayes again

Bayesian probability requires a different way of thinking.

If frequentist and Bayesian inference were programming functions, with inputs being statistical problems, then the two would be different in what they return to the user. The frequentist inference function would return a number, representing an estimate (typically a summary statistic like the sample average etc.), whereas the Bayesian function would return probabilities
For example, in our debugging problem above, calling the frequentist function with the argument "My code passed all tests; is my code bug-free?" would return a YES. On the other hand, asking our Bayesian function "Often my code has bugs. My code passed all tests; is my code bug-free?" would return something very different: probabilities of YES and NO. The function might return: 
YES, with probability 0.8; NO, with probability 0.2 
This is very different from the answer the frequentist function returned. Notice that the Bayesian function accepted an additional argument:  "Often my code has bugs". This parameter is the prior. By including the prior parameter, we are telling the Bayesian function to include our belief about the situation.
The formula for Bayesian probability goes like this:

P(A|X) = P(X|A) P(A)

Which reads in English as: the probability of A given X is equal to the probability of X give A multiplied by the probability of A by itself and divided by the probability of X by itself.

P(A|X)is called our posterior and P(A) is called our prior.

Probability distributions refresher

Probability distributions can be discrete (integers) or continuous (real numbers). The distribution of a discrete variable is called a probability mass function, a continuous variable has a probability density function.

One (of many) discrete functions is the Poisson distribution that dates back centuries and was derived to estimate the probabilities of cavalry horses losing a shoe in (or some other independent event). It looks like:

P(Z = k) = λk e

and this has the convenient property that its expected value is λ (or, in maths speak, E[Z|λ]λ).

So, this looks as good a distribution as any for our prior. The trouble is, what is λ? Aye, there's the rub. λ can be anything (an integer or any real number), so to model it, we can choose an exponential distribution. It is continuous and it has some convenience. The probability density function looks like:

f(z|λ) = λ e-λz ,  z >= 0

and the convenient fact is that E[Z|λ] = λ-1.


The task is to deduce when (if at all) somebody's behavior changed given a list of the number of text messages they sent over a certain time period. To do this, the book uses PyMC, a Python library.

The workhorse of the code is the generation of simulated data:

def lambda_(tau=tau, lambda_1=lambda_1, lambda_2=lambda_2):
    out = np.zeros(n_count_data)
    out[:tau] = lambda_1  # lambda before tau is lambda1
    out[tau:] = lambda_2  # lambda after (and including) tau is lambda2
    return out

which simply creates an array where the elements are one of two values. All the values before tau are lambda_1 and the rest are lambda_2. Simples.

"Inside the deterministic decorator [@pm.deterministic], the Stochastic variables passed in behave like scalars or Numpy arrays (if multivariable), and not like Stochastic variables."

By putting some print statements in, you can see that this function gets hit thousands of times when we feed it into the Poisson function that is given to a Model function that is passed to a MCMC function (Monte Carlo Markov Chain). Phew! The details can be found in the freely downloadable book and I'll explain more when I show the Figaro implementation.

"MCMC returns samples from the posterior distribution, not the distribution itself... MCMC performs a task similar to repeatedly asking "How likely is this pebble I found to be from the mountain I am searching for?", and completes its task by returning thousands of accepted pebbles in hopes of reconstructing the original mountain. In MCMC and PyMC lingo, the returned sequence of "pebbles" are the samples, cumulatively called the traces."

And it's these traces that (when graphed) can help us to see when it is most likely that behavior changed. I'll go into more detail with the Scala implementation in part 2.

Saturday, September 3, 2016

Yet more maths and graphs - Lagrange Multiplier

Here's a little maths trick called the Lagrange Multiplier that helps in finding the maximum/minimum of a function f(x, y) given a constant that also uses x and y. We "form the function

F(x, y) = f(x, y) + λ Φ(x, y) 

and set the two partial derivatives of F equal to zero. Then solve these two equations and the equation Φ(x, y) = constant for the three unknowns x, y and λ".

A nice proof can be found in the essential undergraduate text, Mathematical Method in the Physical Sciences. Let's say we're trying to find the max/min of the function f(x, y). Obviously, at this point.

df = δf dx + δf dy = 0                               (1)
     δx      δy

and say we've been given a function that is a constant Φ(x, y). Once again, it should be obvious that:

dΦ = δΦ dx + δΦ dy = 0                               (2)
     δx      δy

since Φ(x, y) is a constant.

Now, multiply the second equation by λ and add it to the first.

(δf + λ δΦ )dx + (δf + λ δΦ)dy = 0                   (3)
 δx     δx        δy     δy

Let's take that second term. Since λ is arbitrary, let's choose it to be such that:

δf + λ δΦ = 0                                        (4)
δy     δy

(That is, we've decided

λ = - δf  / δΦ                                       (5)
              δy /  δy

But it's not necessary to worry about that as we're not particularly interested in the value of λ. We just use it to solve the equations).

Similarly, we do the same for the first term of (3):

δf + λ δΦ = 0                                        (6)
δx     δx

Now, with equations (4) and (5) and (6) we have enough information to find the values of x and y at the position of the min/max point.

Why am I writing this post? Because we're going to need the Lagrange Multiplier shortly in partitioning a graph.