Sunday, February 8, 2015

Things they don't tell you about Akka clustering


... or rather they do but it's hard to find. Akka promises to make multi-threaded, multi-JVM code easy. It's very nice but it's not necessarily easy.

Getting your actor system up and running is not too hard and there are good books on it. Derek Wyatt's book just missed the inclusion of cluster and "would probably add a couple of hundred pages" anyway. Here I've tried to give the minimum to get a cluster up and running.

The thing to note is that your Akka actors don't need to changes at all. It's all down to the configuration.

Ok, so let's start with a simple Actor that prints out everything he hears:

class PhillTheActor extends Actor {
  
  val myIndex = PhillTheActor.counter.incrementAndGet()

  override def receive: Receive = {
    case _ => {
      println("message received by actor #" + myIndex)
      Thread.sleep(1000) // very naughty
    }
  }
}

object PhillTheActor {
  final val counter = new AtomicInteger(0)
}

Very simple.

First, let's take the config. We'll create a class we can run:

object ClusterApp extends App {

  println("started")
  
  val systemName = "testSystem"
  val myPort     = args(0)
  val seedPort   = "9119"
  
  val actorSystem = ActorSystem(systemName, ConfigFactory.parseString(
      s"""
        akka {
          actor.provider = "akka.cluster.ClusterActorRefProvider"
      
          remote.log-remote-lifecycle-events = on
          remote.netty.tcp.port = $myPort
          remote.netty.tcp.hostname = 127.0.0.1
          
          cluster {
              seed-nodes = [
                  "akka.tcp://$systemName@127.0.0.1:$seedPort",
              ]
          }
        }
      """))
.
.

We've (arbitrarily) chosen port 9119 to be the port of the seed node. We could have chosen any reasonable port number but the node that has the same value for myPort will be the one calling the shots as we'll see later.

"The seed nodes are configured contact points for initial, automatic, join of the cluster... [Y]ou can only join to an existing cluster member, which means that for bootstrapping some node must join itself." [1]

Now, we need a local router:
 
.
.
  val localSamplerActorName = "localSamplerActorName"
    
  val localRouter =   
    actorSystem.actorOf(Props(classOf[PhillTheActor]).withRouter(RoundRobinPool(nrOfInstances = 2)), localSamplerActorName)
.
.
     
We'll be using localSamplerActorName later so keep an eye on that.

Then we need the cluster router. From the scala docs:

"akka.routing.RouterConfig implementation for deployment on cluster nodes. Delegates other duties to the local akka.routing.RouterConfig, which makes it possible to mix this with the built-in routers such as akka.routing.RoundRobinRouter or custom routers."

We only need this on one node so:

.
.
  if (myPort == seedPort) {
        
      val loadBalancingGroup = AdaptiveLoadBalancingGroup(HeapMetricsSelector)
      val clusterRouterGroupSettings = ClusterRouterGroupSettings(
          totalInstances = 3,
          routeesPaths = List(s"/user/$localSamplerActorName"),
          allowLocalRoutees = true,
          useRole = None
          )
      
      val clusterRouterGroup = ClusterRouterGroup(loadBalancingGroup, clusterRouterGroupSettings)
      val clusterActor       = actorSystem.actorOf(clusterRouterGroup.props, "clusterWideRouter")
.
.

There's a lot going on here. The argument totalInstances refers to how many instances in the cluster will be part of this load balancing.

The routeesPaths is the path to the actor that is the local router (see above) that is referenced by the variable localSamplerActorName.

The allowLocalRoutees is straightforward: does this node also process work or is it just a coordinator only?

As for roles: "Not all nodes of a cluster need to perform the same function: there might be one sub-set which runs the web front-end, one which runs the data access layer and one for the number-crunching. Deployment of actors—for example by cluster-aware routers—can take node roles into account to achieve this distribution of responsibilities." [1]

Notice that the cluster wide balancer uses a Group and the local balancer uses a Pool. "The Akka team has introduced two different types of routers in Akka 2.3: Pool and Group. Pools are routers that manage their routees by themselves (creation and termination as child actors) whereas Groups are routers that get pre-configured routees from outside." (from this blog). So, the local router using a round-robin pool will spin up actors for us while the cluster-wide router is using this pre-configured routing actor.

Finally, we send messages to this cluster just as we would any actor:

      while (true) {
        clusterActor ! "hello!"
        Thread.sleep(10)
      }

whereupon PhillTheActor starts printing out messages. If I start this app again with other ports as arguments, then PhillTheActor on them also prints out messages.

Obviously, there is much more to Akka clustering but this got my simple app up and running.

[1]  Akka Cluster Usage docs

Further Reading

Adam Warski's blog.

No comments:

Post a Comment