Supervisor strategy using an exponential back off algorithm

rroestenburg

In this blog post I'm going to show you how you can use an exponential backoff algorithm in combination with a stopping supervisor strategy. In many cases you would like to retry some 'dangerous operation', something that can crash, for instance a call to some external service.

The OneForOneStrategy andAllForOneStrategy supervisor strategies have two arguments for a retry window; maxNrOfRetries and withinTimeRange can be used to define a maximum amount of retries within a maximum time range. This retry mechanism retries as soon as possible and does not wait between retries.

In some cases you would like the retry to be delayed so that the failing service does not get overloaded with unnecessary retries; it might be busy recovering and that might take some time. Instead of overloading the failing service with the same request we would like to use an algorithm that decides to wait longer if the dangerous operation keeps failing.

Exponential truncated backoff is such an algorithm which is also used in network congestion avoidance. The algorithm is quite simple:

case class ExponentialBackOff(slotTime: FiniteDuration, ceiling: Int = 10, stayAtCeiling: Boolean = false,
                              slot: Int = 1, rand: Random = new Random(), waitTime: FiniteDuration = Duration.Zero,
                              retries: Int = 0, resets: Int = 0, totalRetries: Long = 0) {
  def isStarted = retries > 0

  def reset(): ExponentialBackOff = {
    copy(slot = 1, waitTime = Duration.Zero, resets = resets + 1, retries = 0)
  }

  def nextBackOff: ExponentialBackOff = {
    def time: FiniteDuration = slotTime * times
    def times = {
      val exp = rand.nextInt(slot + 1)
      math.round(math.pow(2, exp) - 1)
    }
    if (slot >= ceiling && !stayAtCeiling) reset()
    else {
      val (newSlot, newWait: FiniteDuration) = if (slot >= ceiling) {
        (ceiling, time)
      } else {
        (slot + 1, time)
      }
      copy(slot = newSlot,
        waitTime = newWait,
        retries = retries + 1,
        totalRetries = totalRetries + 1)
    }
  }
}

I've chosen to make the above code immutable so it is safer to use in concurrent code. Immutability is not always necessary when used inside actors but it is better practice to use immutable data structures in combination with var than mutable data structures with val, in case you want to share it with other actors in some way.

Both reset and nextBackOff return a new ExponentialBackOff. The algorithm advances randomly through a series of slots. Every slot has an associated wait time, with every next slot the waitTime is increased exponentially. If the back off is reset it starts at the first slot again.

Next we will look at how we can use this algorithm with Akka Actors. We will use the standard let it crash approach and supervise an actor that can possibly crash. When the actor (I will call this actor the 'dangerous actor') crashes on the dangerous operation its supervisor will stop the actor, create a new one and continue to resend possibly failed messages and next messages to the new actor. The supervisor is called a BackOffSender, which forwards all messages to a child actor (this is the DangerousActor).

The above BackOffSender and the DangerousActor use a couple of messages to communicate. All these messages are kept in an object; the BackOffProtocol. Keeping messages together in a protocol object makes it easy to find messages that are related to each other. It is also simpler to see inside the actors which messages are used and which 'message protocol' they are part of.

object BackOffProtocol {
  /**
* The message type that is used in the domain. The BackOffSender forwards this type of message
* to its dangerous child, which performs some dangerous operation on it.
*/
  case class Msg(id: Long, data: String)

  /**
* Used between the supervisor (the BackOffSender) and the child dangerous actor to track the original sender of the Msg message.
* When a message fails for a while and succeeds after a number of retries, the original sender of the
* Msg can be responded to.
*/
  case class TrackedMsg(msg: Msg, sender: ActorRef)

  /**
* The dangerous actor needs to send this message to its supervisor (the BackOffSender) when it has successfully
* processed the message. When the BackOffSender receives this message it resets the back off algorithm.
*/
  case class Sent(id: Long)
}

The BackOffSender creates and recreates the dangerousActor from a Props object. Akka uses parental supervision which means that a parent needs to supervise its children. The Props object contains all the information to create the dangerous actor. It also makes it possible for the BackOffSender to create a child without knowing how or what dependencies it needs.

The below code shows the BackOffSender.

class BackOffSender(dangerousProps: Props, slotTime: FiniteDuration = 10 millis, ceiling: Int = 10, stayAtCeiling: Boolean = false) extends Actor with ActorLogging {
  import BackOffProtocol._
  var backOff = ExponentialBackOff(slotTime, ceiling, stayAtCeiling)

  override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
  var dangerousActor = context.actorOf(dangerousProps)
  context.watch(dangerousActor)
  // messages that have not been acknowledged (yet)
  var possiblyFailed = Vector[TrackedMsg]()
  var scheduleResult: Option[Cancellable] = None
  implicit val ec = context.system.dispatcher
  val Tick = "tick"

  def receive = {
    case msg: Msg =>
      val trackedMsg = TrackedMsg(msg, sender)
      possiblyFailed = possiblyFailed :+ trackedMsg
      // only send immediately if the backOff was not needed yet, or reset by an ack.
      // otherwise schedule with delay specified by backOff.
      if (!backOff.isStarted) dangerousActor.forward(trackedMsg)
      if (backOff.isStarted && scheduleResult.isEmpty) scheduleResult = Some(context.system.scheduler.scheduleOnce(backOff.waitTime, self, Tick))
    case Tick =>
      possiblyFailed.foreach(trackedMsg ? dangerousActor.tell(trackedMsg, trackedMsg.sender))
      scheduleResult = None
    case Terminated(failedRef) =>
      // re-create and watch
      dangerousActor = context.actorOf(dangerousProps)
      context.watch(dangerousActor)
      // if there where failed messages schedule after the next wait in the back off alg.
      // if one of these fails, it will result in another termination and a possibly higher delay
      backOff = backOff.nextBackOff
      scheduleResult = Some(context.system.scheduler.scheduleOnce(backOff.waitTime, self, Tick))

    case Sent(idKey) =>
      //Ack of the dangerous actor. Remove successful message and reset the backOff.
      possiblyFailed = possiblyFailed.filterNot(tracked ? tracked.msg.id == idKey)
      backOff = backOff.reset()
  }
}

The BackOffSender stores all messages and their related senders before forwarding them to the dangerous actor. Once the dangerous actor acknowledges the correct processing of a message with a Sent message it is removed from the list of possibly failed messages.

The BackOffSender uses the stopping strategy instead of the default restart strategy. Instead of restarting the dangerous actor it stops the actor when it crashes.

Messages to the dangerous actor are forwarded through the BackOffSender so that the dangerous actor can just be stopped and recreated from scratch. The ActorRef to the dangerous actor changes when it is stopped and recreated, which is different from when an actor is restarted with the Restart directive where the ActorRef always refers to the right actor instance. Because the BackOffSender sits in between the dangerous actor and the client that wants to interact with the dangerous actor it has more options than just restarting the actor, it could even fallback to a different service for instance.

The BackOffSenderis both the supervisor and a monitor of the dangerous actor, it both creates and monitors the child. When the child is terminated the BackOffSenderreceives a Terminated message. When this happens the BackOffSender recreates the dangerous actor, watches it and schedules a delayed 'Tick' message to itself.

The delay is decided by the backoff algorithm. When the BackOffSender receives a Tick it tries to resend all the possibly failed messages. New messages that are received while the dangerous actor is failing are scheduled with a delay as well. When a Sent acknowledgement is received the back off algorithm is reset and next messages are forwarded immediately again to the dangerous actor.

The below example shows a fake DangerousActor that is used in the unit test.

/**
 * A 'Dangerous' actor which uses a dangerous resource to do its work.
 * This actor is killed the moment it throws any Exception
 */
class DangerousActor(dangerousResource: DangerousResource) extends Actor with ActorLogging {

  import BackOffProtocol._
  def receive = {
    case trackedMsg: TrackedMsg =>
      // do the dangerous operation
      val response = dangerousResource.danger(trackedMsg.msg.data)
      //indicate to supervisor that the dangerous operation was successful
      context.parent ! Sent(trackedMsg.msg.id)
      // respond to sender with the result
      sender ! trackedMsg.msg.copy(data = response)
  }
}

The presented example still has a few drawbacks that I'll leave up to the reader to solve. The possiblyFailed vector can grow indefinitely if the dangerous actor keeps failing. A good solution for this is to max the possiblyFailed vector and remove the oldest entries when it does in LRU fashion.

The default stoppingStrategyuses the defaults for the OneForOneStrategywhich means it will retry forever. If you want to 'truncate' the exponential backoff you could decide to change these default arguments and create your own custom supervisor strategy based on the default stoppingStrategy.

The full example including unit tests is available ongithub. There is also an example of a dangerous Akka Camel Producer which shows how you can back off from a http server on a 500 response status.
If you would like to know more about Akka and its fault tolerance features refer to the great documentation online, or (shameless plug) buy Akka in Actionat manning.com which is available for Early access! Chapter 3 on Fault Tolerance is going to be available very soon.

Comments (0)

    Add a Comment