Scala actors for the enterprise: introducing the Akka framework

Arjan Blokzijl

Last week me and some of my colleagues had the pleasure of being on the receiving end of an excellent training given by Jonas Bonér. The topic was his new pet project: the Akka framework. Perhaps you've played around with Scala lately, and also have taken the first steps in using its Actor library. Simply stated, an Actor is a unit of execution that (usually asynchronously) processes messages and encapsulate their state. An actor does not expose its state, and messages are processed sequentially. The Actor model has been around for quite some time, but today the best-known Actor implementation is Erlang.
The Actor model has been implemented in the standard Scala library by Philipp Haller (for the interested reader, a solid reference is for instance this article explaining how actors in Scala work). In most Actor examples written in Scala, it is not uncommon to find only EchoActors, PingPongActors, and FibonacciSolvingActors. Nice examples, but perhaps you might wonder if they are of any use in enterprise Scala at all. Next to this, if you're interested in concurrent, message passing processing models, STM's, NoSQL data stores, and occasionally wonder what the future in enterprise computing might bring, than Akka might be just the framework you're looking for. This blog is intended to provide a brief introduction into one feature of this framework: Akka's supervisor Actors. It is mostly based on the knowledge extracted from Jonas during the training, and I hope to whet your appetite for it.


First of all, Akka is likely to support what Ray Racine has coined to be a 'System of Service'. I'm quoting him here literally from the scala mailing list:

Its not too far of stretch to say commercial vendors, and most frameworks are very System Of Record oriented. Need to build a new app to maintain X.
Install RDB, app server, select O/R mapping and GUI form framework, place warm bodies in front of drag and drop IDEs and go for it.

A System Of Service must service 1,000s of requests per second in millisecs, 24/7/365 with 99.99 % reliability e.g. a pricing service. The business
logic has to run in microseconds. Your favorite O/R mapping framework hasn't even initiated a JDBC call, heck hasn't even allocated a connection from the pool and its already exhausted its 1 ms in allotted time.

An item may undergo a few 10s of price changes a year on the System Of Record, yet that item's price may be served 100,000 times for each change on the System Of Service'.

In other words a System of Service is a system designed to have extreme throughput, and almost zero downtime. It may not be your everyday CRUD application, but if you need to build them, it is nice to know of some framework that supports it. In response to Ray Racine's post Jonas Bonér posted his vision for the Akka framework. These posts provide plenty of highly interesting reading material, and can be found here.

Akka is therefore meant to be a kind of framework that supports a System Of Service application. It has many features, too much to cover all at once, and therefore we will start of with having a look at Akka's actors supervisor capabilities.
Akka has an Actor model with supervision trees, based on Erlang's OTP design principles. Supervisors are processes that monitor a set of workers, that do the actual processing. The basic idea of a supervisor is that it should keep its child processes alive by restarting them when necessary. A worker is an Actor, that receives and processes a message. When a worker dies abnormally (by throwing an Exception), it is up to the supervisor to determine what to do. It may decide to restart the worker, and try to process the message again, with a maximum number of retries set. The approach is to expect that failures can and eventually will happen. Instead of trying to prevent this, the idea is to Let it crash (TM Jonas Bonér), reset the worker to a stable state, restart it. This idea has served Ericsson well in building highly fault tolerant, distributed message passing software. We can now profit from this idea in Scala as well via Akka.

In order to achieve this, Akka has its own version of the Actors library, that have more 'system of service' features than the standard Actors library found in the Scala distribution. To demonstrate a simple example of a supervisor class, here's some sample code to demonstrate the idea. First, we start of with a simple Actor:

import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.actor.{OneForOneStrategy, Actor}

sealed trait Message

case class Supervise(worker: Worker) extends Message

case class DoWork(work: Work) extends Message

case object Die extends Message

class Worker(workerName:String) extends Actor {
lifeCycleConfig = Some(LifeCycle(Permanent, 100))

def receive: PartialFunction[Any, Unit] = {

case DoWork(work:String) =>
log.debug("start working... at: " + work)

case Reset =>
log.info("%s has been reset", toString)

case Die =>
log.debug("Dying...")
throw new RuntimeException("I'm dead: " + this.toString)

case other =>
log.error("Unknown event: %s", other)
}

override def preRestart(reason: AnyRef, config: Option[AnyRef]) {
log.debug("pre-restarting " + this)
}

override def postRestart(reason: AnyRef, config: Option[AnyRef]) {
log.debug("post-restarting " + this)
}

override def toString = "[" + workerName + "]"

As regards to sending message passing, an Akka actor follows the same API as the 'normal' Scala ones: you can send a message to an actor using the ! (pronounced bang) method. This is a 'fire and forget' message, which is then processed asynchronously by the actor. There are some more options to send messages, but we'll skip these for now. In Akka, Actors have to be started explicitly by calling actor.start for it to be able to process any messages. It can be stopped by calling actor.stop.
Compared to the standard Scala actors, the API for processing messages is more limited: there is exactly one method that you must override. This is the receive method, which must return a PartialFunction. The receive method uses a pattern match to distinguish between various messages it can process. A pattern match is compiled down to a PartialFunction, which is the return value of the pattern match.
To send messages to actors, we've defined a Message trait, which various case classes that represent the actual messages extend. In principle, you can send any message to an Actor, however, in practice however, the only messages you should send should be immutable. In Scala, case classes and case objects are immutable by default, and thus are a natural fit for this. In this case, we've defined a 'DoWork' message (all messages extends from the same Message trait), which should set the Worker actor to do something that hopefully will be be of some use. When the worker receives a Die message, it will throw an exception. This is a somewhat silly example of course, but hopefully it demonstrates the idea.

So far, nothing special, apart perhaps the pre- and postRestart methods, and the field lifeCycleConfig = Some(LifeCycle(Permanent, 100)). This is configuration for Akka's actors supervisor hierarchy capabilities. To see further how this works, let's code up a supervisor:

import se.scalablesolutions.akka.actor.OneForOneStrategy

object WorkerSupervisor extends Actor {
trapExit = true
faultHandler = Some(OneForOneStrategy(3, 100))

def receive: PartialFunction[Any, Unit] = {
case DoSupervise(worker:Worker) =>
log.info("Supervising worker: " + worker)
startLink(worker)

case unknown =>
log.error("Unknown event: %s", unknown)
}
}

Our supervisor is also an Akka actor (in this case, it is an object, i.e. a singleton in Scala), that processes 'DoSupervise(worker)' messages. It handles this message by starting to supervise the worker by the invoking the 'startLink(worker)' method, which is defined in the Actor class. This will add our worker to the list of its linked actors, and also handles starts the linked Actor at the same time. Of cours the nice thing is that the Actor class will handle thread safety for us, so we don't need to worry about any multithreading issues here.
The linking achieves exactly what it promises: if a linked Actor (the worker in our case) throws an exception, the supervising actor (our WorkerSupervisor) will be notified of this. For this, it is necessary to override the variable trapExit, and set it to true. It may then decide what to do based on the fault handler strategy Strategy that is defined by defining the faultHandler variable. There are currently two strategies: OneForOne, meaning that only the actor that has crashed will be restarted, and AllForOne, meaning all linked actors (including the crashed one) will be restarted. This is also where the lifeCycleConfig defined on our worker Actor comes in. LifeCycle(Permanent, 100) means our actor has a permanent LifeCycle and will always be restarted in case of an exception. The preRestart and postRestart are callback methods that may be overridden by the supervised actor to do some initialization work when it is restarted. Note that our supervised actors will probably be some kind of application services, instead of our useless Worker actor, and therefore may really need to do some usefull initialization work like setting up resources, connections, etc, in order to operate properly.

Instead of manually coding a Supervisor like we've done, there's an alternative in setting up a set of linked services in a more declarative way by extending Akka's SupervisorFactory class, as follows:

import se.scalablesolutions.akka.actor.SupervisorFactory
import se.scalablesolutions.akka.config.ScalaConfig.{LifeCycle, SupervisorConfig, RestartStrategy, Supervise, OneForOne, Permanent}

class MySupervisorFactory extends SupervisorFactory {

val worker = new Worker("worker-1")

override protected def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(OneForOne, 3, 10),
Supervise(
worker,
LifeCycle(Permanent, 1000)) :: Nil)
}
}

object factory extends MySupervisorFactory

This defines a list of supervised actors (in our case, a one element list containing our worker) and then use the following code:

val supervisor = factory.newSupervisor
supervisor.startSupervisor

which will link all workers to one supervisor (defined within the framework) and (re)start them based on the configured strategy. You can therefore pick your preferred way of configuring.

All that remains is to see whether it actually works. For this we need a small simulation class, defined as follows:

class Simulation {

var worker:Worker = _

@Before
def setUp = {

worker = new Worker("worker-1")
WorkerSupervisor ! DoSupervise(worker)
}

@After
def tearDown = {
WorkerSupervisor.stop
}

@Test
def testSuperviseWorker = {
Thread.sleep(500)
println("\n===> start working")

worker ! DoWork("Some work")
Thread.sleep(500)

worker ! Die
Thread.sleep(1000)

worker ! DoWork("Some more work")

println("\n===> finished")

}
}

Running this shows output like the following:

INF [20091020-19:28:25.242] akka: Supervising worker: [worker-1]
DEB [20091020-19:28:25.242] akka: Linking actor [[worker-1]] to actor
[Actor[1256143591312:class akka.supervision.WorkerSupervisor$]]

===> start working
DEB [20091020-19:28:25.750] akka: start working... at: Some work
DEB [20091020-19:28:26.249] akka: Dying...
java.lang.RuntimeException: I'm dead: [worker-1]
	at akka.supervision.Workeranonfun$receive$1.apply(Worker.scala:48)
	at akka.supervision.Workeranonfun$receive$1.apply(Worker.scala:38)
	at scala.PartialFunctionanon$1.apply(PartialFunction.scala:38)
	at se.scalablesolutions.akka.actor.Actor$class.transactionalDispatch(Actor.scala:496)
	at se.scalablesolutions.akka.actor.Actor$class.invoke(Actor.scala:461)
	at akka.supervision.Worker.invoke(Worker.scala:33)
	at se.scalablesolutions.akka.actor.ActorMessageInvoker.invoke(Actor.scala:39)
	at se.scalablesolutions.akka.reactor.EventBasedThreadPoolDispatcheranon$1$$anon$2.
            run(EventBasedThreadPoolDispatcher.scala:99)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:619)
DEB [20091020-19:28:26.260] labs: pre-restarting [worker-1]
INF [20091020-19:28:26.262] labs: Restarting actor [class akka.supervision.Worker]
                                                         configured as PERMANENT.
DEB [20091020-19:28:26.265] labs: post-restarting [worker-1]

===> finished
DEB [20091020-19:28:27.251] labs: start working... at: Some more work

Process finished with exit code 0

Which indeed shows that our rudimentary supervisors work.
This concludes the introduction into Akka's supervising actors. There is much more that Akka's actors are capable of than shown here (remoting, transactions, STM, cassandra and mongo backend, to name just a few), but those are materials for future blogs. It may be perhaps a bit much to take all at once, as it requires to learn an API that is likely to be unknown to most of us. I won't deny that the Akka framework certainly has some learning curve, but the reward is great once you get the hang of it.
Akka is relatively young and still evolving. If you're interested in its future directions, check out the roadmap for the 0.6 release.

Comments (12)

  1. Raoul Duke - Reply

    October 22, 2009 at 8:04 pm

    @TM Jonas Boner

    uh, by far he isn't the first and shouldn't be able to TM it i feel :-)

    http://www.google.com/search?q="let+it+crash"+software

  2. J.F. Zarama - Reply

    October 22, 2009 at 11:44 pm

    Thanks for the article describing Boner's latest work; this material is a great reference to introduce concurrency and its implementation in Scala Actor's and now the proposed improvement as implemented by Akka. The minute I saw Boner's name I knew I had to read it and you have done a very good job explaining what, how and why of a subject that is difficult to understand and use for all of us but certainly more so for colleagues and personnel new to the subject; thanks again for a very good coverage of the topic.

  3. Albert Strasheim - Reply

    October 23, 2009 at 2:29 pm

    Where and how did you download and/or build the version of Akka you used?

    • Arjan Blokzijl - Reply

      October 23, 2009 at 4:48 pm

      Hi Albert,

      Akka is hosted on github, you can find it here: http://github.com/jboner/akka. You need to have git installed on your machine to checkout the source, for which you can use the following command: git clone git://github.com/jboner/akka.git.
      It builds with maven 2. In order to build it, you need to set the environment variable AKKA_HOME, and point it to the root of directory where you have checked out the sources.

      Cheers,
      Arjan

  4. notfancy - Reply

    October 23, 2009 at 6:19 pm

    > Last week me and some of my colleagues had the pleasure of being on the
    > receiving end of an excellent training given by Jonas Boner.

    This is not professional, people (protip: it's Bonér).

  5. Book Of JOSH - Reply

    October 24, 2009 at 1:29 am

    The next generation enterprise Systems Of Service platform (the next JBoss if you want to go for it).
    - JVM based
    - Data feed from backend traditional J2EE+RDB or legacy applications
    -- the Systems of Record or Maintenance if you will
    - distributed multicore 2-3+ nodes => 32 - 96 or more cores, (soon to be) commodity box cluster
    - pluggable/dynamic/module substrate OSGI, e.g. Felix, Karaf
    - Components written in Scala
    -- w liberal use of Actors w OTP policies, some STM as well
    - Paxos based node coordination and eventual data consistency synchronization
    - The majority if not all data in memory, check pointed on SSD.
    - Cluster of N nodes survives failures and keeps on chugging. 2F + 1 = N
    - Composite App ready, as API is HTTP, REST, JSON/SimpleXML (JOSH)

    None of the commercial guys will go there. IBM, Oracle, the whole lot are way too invested in their J2EE + RDB offerings. None of their marketing guys will let them touch the golden goose until its too late. Meanwhile a decent solution that can receive receiving data from a mainframe or ERP, that can serves prices or item availability or coupons, or ... in realtime, millions of times a day on commodity boxes, no outage windows, no downtime and no invoice for annual license fees in excess of 25K per cpu core _has_ a market.

    Wide open ... the next BEA. Sheesh where the hell are all the VC dudes with a brain at least a tenth the size of his ahmmm wallet when you need them!?! Too many tweety social thingy clones still to chase I suppose.

  6. Knowtu » links for 2009-10-26 - Reply

    October 27, 2009 at 2:07 am

    [...] Enterprise scala actors: introducing the Akka framework | Xebia Blog (tags: software scala concurrency) [...]

  7. Twitted by bubbl_scala - Reply

    November 23, 2009 at 10:53 am

    [...] This post was Twitted by bubbl_scala [...]

  8. Twitted by jboner - Reply

    November 25, 2009 at 10:06 am

    [...] This post was Twitted by jboner [...]

  9. Ulf Wiger - Reply

    January 11, 2010 at 9:01 am

    FWIW, I used the phrase "let it crash philosophy" in reference to Erlang programming in a paper published March 2001 (http://www.erlang.se/publications/Ulf_Wiger.pdf). It may have been the first reference to the exact phrase in a paper on Erlang (I'm not sure), but it was cited as a common expression:

    "This, in combination with the use of pattern matching, leads to a style of
    programming that is often referred to as 'programming for the correct case' or 'the Let it
    Crash philosophy.' The latter is of course a bit provocative, but seems to serve the
    purpose of countering many programmers’ instinctive urge to save the life of their
    process at all cost." (p 14)

    Joe Armstrong's PhD thesis compares the concepts to "fail-fast" programming in the Tandem computers, and Ericsson had a form of "fail-fast" programming with fine-grained recovery, in the form of "small restarts", from the beginning (early 70s) in the AXE architecture. The AXE architecture and its in-service performance was the Gold Standard, and the CSLab experiments mainly tried to come up with a more modern and productive way of programming telecoms, without sacrificing the reliability.

    Anyway, good concepts deserve to spread. The idea of using process links for cascading exits came from Mike Williams back in the 80s, who was inspired by the "C wire" in the old mechanical phone switches - when you pulled the C wire, all resources allocated for that phone call were released. Adding the ability to trap exit signals allowed one process to supervise another.

  10. System of Service « Tales from a Trading Desk - Reply

    September 7, 2010 at 10:06 am

    [...] Which neatly brings us to Ray Racine ‘System of Service’ – reference by Arjan over on Xebia. Akka is an interesting framework that offers the Actor pattern amongst other things [...]

  11. Wade Arnold » Scala is easier than PHP - Reply

    November 17, 2010 at 8:33 pm

    [...] manor. I have found that as I have moved from writing code that is designed to persist data to a system of service that consist of multiple UI and data service inputs and outputs with different hotspots and unique [...]

Add a Comment