14 April 2016

Practical Akka Persistence

Having used Akka Persistence in a live project, we at Digital Magic have decided to write our experience down as a blog post. Here you'll find practical advice that would help you start using the framework, side-stepping the rough edges.

Familiarity with the actor model and its Akka implementation are assumed. Familiarity with Scala would help.



What does Akka Persistence get you

Akka actors lose their internal state when stopped. Quite simply, the Akka Persistence framework enables stateful actors to:
  • persist their state to a durable store of your choice
  • recover actor state from a persistent store at actor startup
For example, you can model shopping cart entities as stateful actors. In case your application crashes and gets restarted, the in-memory state of your shopping cart actors would automatically be restored by the framework to the pre-crash state; the users of the system would keep shopping as if nothing has happened.

A little theory first.

Event Sourcing

Traditionally, an application keeps a snapshot of its current state in a database. Suppose the following actions have been performed by a system user:


  • a group G was created
  • a couple of users A and B were created
  • user A was added to the group
  • user B was added to the group
  • user A was removed from the group

The resulting state (in e.g. RDBMS) looks like follows:

https://documents.lucidchart.com/documents/ebec7350-2d39-4c75-ae4b-55ac13bf6f6a/pages/TEt6q_3QyzB1?a=209&x=143&y=68&w=374&h=264&store=1&accept=image%2F*&auth=LCA%204c2c13fb4ef259b5d19956ae0a74b4058e84a052-ts%3D1458838848

The history of events that has led to this particular state is not recorded. Was user A ever a member of the group?

With event sourcing, instead of saving current application state, a sequential log of incremental changes to the state is stored. Each state change is captured in an event - an immutable fact of the past, something that has already happened. The event log is append-only, with no events ever updated or removed.

In order to reconstruct the current application state, all past events are replayed in their original order, and sequentially applied to the working state using a deterministic function {old state, event} => {new state}.

A familiar example of a system based on event sourcing is a version control system. Every repository change is captured as a commit, and you rebuild the current state of the code base by replaying all the commits into an empty directory.

Event sourcing can also be found in real world businesses – e.g. doctors gradually appending data to your medical profile, or formal contracts being updated by the means of addendums (not striking words out from the original contract).

Advantages of event sourcing

  • With Event Sourcing, you have a complete log of every change ever applied to the system. This opens up business opportunities such as:
    • Additional data insight. Data about your business is value. For example, in case some day in the future you decide it would be opportune to know the set of items your customers remove from their shopping carts right prior to checking out, you would be capable of having such a report retrospectively, not just "from that day forward".
    • A complete audit trail - for free.
  • Excellent write performance characteristics due to "append-only" write pattern.
  • Easy to scale the write store up because there is no write contention or locking.
  • Easy to reproduce issues for debugging purposes – just replay the event log up to a particular point in time and debug from there.
    • The stream of events up to the point of failure might even become your test case setup.

Drawbacks of event sourcing

One drawback is an increased storage capacity requirement, as data gets never deleted. But data is value, so one might actually argue this is a drawback at all.However, the biggest problem is that with event sourcing, it is troublesome to query data.
  • All you have is an event log. How do you answer queries such as "is user B a member of group G"? Replaying the complete log each time in order to answer a query would be very sad, indeed.
  • How do you answer queries that need data to be aggregated from several event logs?
In order to combat the log querying problems, let's take advantage of the idea of CQRS, discussed next.


CQRS

CQRS stands for Command and Query Responsibility Segregation. The idea is quite simple: you can have the same data, but different data models for reads and writes. Turn the log of events into a model more suitable for querying by projecting event store(s) into query store(s).

CQRS can be depicted as:


Advantages of CQRS


First and foremost, CQRS enables event sourcing to be practically applicable, by providing a way for data to be queried.
An important observation is that a single data model simply can not be equally appropriate for all kinds of queries as well as for writes at the same time. Trying to have a single data model fit everywhere adds inherent complexity to the system.
With CQRS, you can optimize the write and query stores independently. You might want to store the event log in a data store optimized for sequential append-only writes (such as Cassandra), while having your audit trail reside in Elasticsearch specializing in full-text search, still having your documents served by MongoDB.

You might project portions of your data into a relational database and have the tables optimized for queries by giving up normal forms, not worrying about data duplication or data inconsistencies because there is only one system of record - the event log, from which the query stores may be rebuilt at any time.

Another benefit is that you can scale your read stores independently of write stores. It is typical for queries to happen orders of magnitude more often than writes.

Drawbacks of CQRS


The query side has to be prepared that their requests will be served with relaxed (eventual) consistency. This is due to that the projection of the write store to the query store is typically non-instantaneous. However, read on about serving simple queries in a consistent manner from an in-memory state.
CQRS also implies increased storage capacity requirements, as the same data resides in several data stores (and probably in non-normal forms, as well).

Akka Persistence

In short, the Akka Persistence framework is an implementation of Event Sourcing and CQRS on top of Akka.

Akka Persistence comprises of the following components:
  • Persistent Actor
  • Event Journal
  • Snapshot Store
  • Akka Persistence Query (experimental API as of Akka 2.4)
  • At-least-once delivery
  • Persistent FSM (experimental API as of Akka 2.4)
Let's cover each of these separately.

Persistent Actor


If you wish your entity actor's state to be protected from loss at stop or crash, and recovered at actor startup, use the Persistent Actor interface.

The key concepts around the interface are:

  • Commands - these are imperative instructions for the actor to do something, they are plain vanilla Akka messages. A command may or may not result in a state change.
  • Events - the same concept as discussed in the Event Sourcing section. Events capture state changes, and are immutable facts of the past.
  • State - the working state of an actor is the sum of sequential application of all the events, using a deterministic application function.
Here is an example of the concepts bound together:


Suppose a user management actor is commanded to grant the administrator role to the user identified by rob@corp.com. The actor first checks whether such a user exists, whether he already has such a role, whether the acting user has enough rights to grant roles to other users, etc... In case all such checks pass, an event is created (note the past tense "role has been added"). In order to update the actor state (so that it would reflect the effects of the event), the deterministic event application function {old state, event} => {new state} is called as displayed above.

Event Journal


This concept is the same as was discussed in the Event Sourcing section. An event journal is an append-only immutable log of events. Each persistent actor has exactly one journal, identified by a persistenceId, and you must ensure that no two actors within a cluster have the same persistenceId. Persistent actors have to be cluster singletons.

The event journal can be backed by a selection of data stores, and there is a number of community plugins for RDBMS, NoSQL, document stores, etc... Cassandra seems to be a good fit for the event journal backing store, because it is optimized for writes.


Let's consider an example. Below is a simple example of a domain model:



object Domain {
 // Commands – the external stimuli, these do not get persisted
 sealed trait Command
 case class Deposit(amount: Int) extends Command
 case class Withdraw(amount: Int) extends Command

 // Events - reflect effects that have already happened
 sealed trait Event
 case class BalanceChanged(delta: Int) extends Event

 // State - reflection of a series of events
 case class Account(balance: Int) {

// the deterministic event application function    def apply(event: Event) = event match {

     require(balance >= 0)
     case BalanceChanged(delta) =>
       Account(balance + delta)
   }
 }
}



Actors who would like their state persisted have to inherit the PersistentActor trait:

class Account(val id: String) extends PersistentActor {
 // this identifies the persistent journal and

// does not change between actor incarnations
 override val persistenceId = s"Account.$id"

 var state = Domain.Account(balance = 0)

 // receives events during actor startup
 override def receiveRecover: Receive = {
   case ev: Event =>
     state = state.apply(ev)
 }

 // receives commands during normal operation
 override def receiveCommand: Receive = {
   case Withdraw(amount) =>
     if (state.balance < amount) {
       sender() ! "not enough funds"
     } else {
       // the callback is asynchronous, but Akka guarantees

       // is safe to access and update the state
       persist(BalanceChanged(-amount)) { changed =>
         // the only safe place to apply state mutation caused by the event
         state = state.apply(changed)
         sender() ! s"successfully withdrawn $amount"
       }

       // WRONG: state.getBalance is in unpredictable state
       // sender() ! state.getBalance
     }
 }
}



About actor state


The Akka persistence API does not specify the way an actor’s state is to be kept. A good choice is to keep an actor’s state in a single variable, because:
  • when you start using snapshots it’d be easier to write the whole state in one shot
  • it’s easy to forget to update something if state is scattered over a dozen var variables or behaviour parameters
It’s a good idea to split an actor state into two parts: persistent and transient. The transient part must be reconstructable from the persistent part at any moment. An example is a dictionary of something (id -> object) and a reverse index (object -> id). During recovery, it is advised to reconstruct transient state only after recovery completion, to save up on computational resources.



Snapshot Store


State snapshots are completely optional. A snapshot captures an actor’s current internal state. Snapshots can periodically be written out to bound recovery times of long journals.

For example, suppose an actors has written 3 events, then a snapshot of the state reflecting those 3 events, and then two more events:


In this case, upon actor startup, state recovery starts off with the latest snapshot, not an empty state, so only the events 4 and 5 will have to be read and applied.



Generally, here's how an actor is started up:


  • Run constructor, set initial state to empty
  • Receive latest snapshot, if any, and replace the initial state
  • Reconstruct the current actor state:
    • if there was a snapshot, apply only those events coming after it
    • if there are no snapshots, apply all events
    • in both cases, the resulting state is the same
  • Any incoming messages are held (stashed) until recovery completion

There is, of course, also an API for an actor to save snapshots of its current state, and receive a snapshot during recovery. The framework does not enforce you any rules for when and how to write snapshots, it only provides an API:

override def receive: Receive = {
case cmd: Command =>
  persist(Event.from(cmd)) { ev =>
    // change the state now that the event is persisted
    state = state.process(ev)
    if (SnapshotHelper.isTimeToSnapshot) {
      // trigger a snapshot write
      self ! SaveSnapshot
    }
  }
case SaveSnapshot =>
  saveSnapshot(state)
case SaveSnapshotSuccess(metadata) =>
  // retain only the latest snapshot to ease up the database
  val criteria = SnapshotSelectionCriteria(maxSequenceNr = metadata.sequenceNr - 1)
  deleteSnapshots(criteria)
case SaveSnapshotFailure(metadata, reason) =>
  // not fatal - just log an error
}

Note that you can also maintain a journal of commands (in addition to saving events), but this is optional – it can be useful for getting a sense of how the system is used, and for debugging or testing.

The snapshot store can be backed by a selection of data stores, and there is a number of community plugins.

Persistence Query


Akka Persistence Query is what implements the query side of CQRS. The API provides a stream of journaled events (an Akka Streams stream). The data stream is transformed and redirected to the query store for querying later.

Note that there still is a single system of record - the event log. Query stores can be deleted and re-populated at any time with no data loss.
Note here that the same event journal can be projected to several query stores, and several event journals can be projected to the same query store.

Let's consider a practical example. We want to be able to query an audit trail of all modifications done to our system. The easiest way is to stream all events to an Elasticsearch index, so that the index could later be queried using Kibana.

Note that Elasticsearch may reject or lose writes (due to e.g. field mapping conflicts or network partitioning), but these durability issues do not mean we should avoid using it – just don’t use it as a primary DB, and have the index rebuilt in case of its corruption. A perfect task for Akka Persistence Query, which is going to facilitate the projection of our events to Elasticsearch.

Akka Persistence Query itself is not the query side of an application (it does not serve queries) – it is merely a transport facilitating migration of events from the write store (Cassandra in our case) to the actual query store (Elasticsearch in our case).

Read journal implementations typically have an API to provide a typed stream of events, which can then be transformed by Akka Streams combinators (to e.g. publish events to the query store).

Let’s examine a complete example of an event publisher to Elasticsearch. The publisher uses Akka Persistence Query to evoke an Akka Streams stream of events from many event logs, but it also is a PersistentActor itself because it needs to implement a resumable projection (memorize the last event published to start where it has left off the last time).



class Publisher extends PersistentActor {

 // memorize the last offset written
 var state: Option[UUID] = None

 def engageStreaming = {

   // by default the stream will terminate upon all exceptions
   val readJournal = PersistenceQuery(context.system)
     // obtain a read journal
     .readJournalFor[scaladsl.CassandraReadJournal]("cassandra-query-journal")

   // start streaming from the last offset written
   val initialUuid = state.getOrElse(readJournal.firstOffset)

   // events to be published have to be tagged with "publish_me"
   // using an Event Adapter during the write flow

   // eventsByTag get you an endless stream of typed events.

   readJournal.eventsByTag("publish_me", initialUuid)
     // use Akka Streams combinator to publish 4 events in parallel
     .mapAsync(4) { envelope =>

       // the write to Elasticsearch has to be idempotent,
       // because a failed publishing attempt might be re-run.
       // achieve this by generating a stable id for this event
       val id = s"${envelope.persistenceId}.${envelope.sequenceNr}"
     
       val elasticFuture = envelope.event match {
         case e: Event => Future {
           // asynchronously write event e to Elasticsearch under id

           ElasticSearchWriter.write(e, id)
         }
       }
     
       // once the event is written, map to its offset
       elasticFuture.map(_ => envelope.offset)
     }
     .mapAsync(1) { offset =>
       // persist the 'last offset written' prior to moving forward,
       // handled by receiveCommand as in a typical PersistentActor.

       // note that the ask pattern is of type Future[Any]
       self ? SetLastOffsetWritten(offset)
     }
     .runWith(Sink.ignore)
     .recover { case e =>

       // handle error by shutting the actor down
       context.stop(self)
     }
 }
}

A few words regarding the publisher code above:

  • The publisher actor is a Persistent Actor itself - it memorizes the events that have already been streamed to and acknowledged by Elasticsearch, and does not stream them again (a resumable projection).
  • In case the publisher crashes with some events being in flight to/from Elasticsearch, those events would be re-sent after recovery. However, Elasticsearch document IDs are based on event sequence numbers that are stable and unique for each event, so the processing at Elasticsearch is effectively idempotent.
  • This means that events are delivered to Elasticsearch in effectively an exactly-once manner.

Note that not all journals support the eventsByTag call – without it, it is much more difficult to implement a resumable projection (although not impossible). For example, Cassandra journal 2.x does not support it, while Cassandra 3.x does.

Basically you can have as many different materialized projections as you want, and all of them can use different data models and different db engines. Most systems actually need to have more than one read model to work well.

You can even project a projection if you so choose.

As was mentioned above, an important note is that Read consistency is eventual. It takes a little while (seconds) for persisted events to get into the projected stream. Most of the time, queries can tolerate that, but not always. Where consistency is important, use the actor in-memory state to serve simple queries (for example, “check email and bind it to account if it is available”)

At least once message delivery


Standard Akka provides the following guarantees:
  • message delivery guarantee: at-most-once 
  • ordering guarantees: peer to peer 
Imagine that at the moment of an application crash, your actor was waiting for a response to a message it has sent to another actor earlier. During recovery, both actors would get caught up to their current state, but the request would not automatically be re-sent, and the sending actor would be waiting for a response that would never arrive.

Akka Persistence offers reliable message delivery API via the AtLeastOnceDelivery interface that takes care of re-sending messages when they have not been confirmed within a timeout.

The key concept here is that the sender persists the following facts (in form of events, of course):
  • message has been sent to recipient
  • message has been acknowledged by the recipient
During actor startup (e.g. after a crash), the framework will prompt the persistent actor to re-send any "sent" yet "non-acknowledged" messages.

A few words about the communication protocol:
  • The recipient has to send message acknowledgements.
  • The recipient has to:
    • process messages in an idempotent manner, or
    • have facilities to detect and handle duplicate messages
  • Message ordering is no longer guaranteed due to re-transmissions

An example system


Depicted below is an architecture taking advantage of most of things discussed so far.


Write journals are backed by Cassandra, as it is a data store optimized for writes.

A query store is backed by Elasticsearch, which gets populated by events from several event journals by means of Akka Persistence Queries.
  • Event log projection to Elasticsearch is resumable - the publisher memorizes the "last published" position within the events stream using Akka Persistence.
  • Message delivery to Elasticsearch is "exactly-once" due to stable document IDs and idempotent processing of them at Elasticsearch.
 There are two read models:
  1. Simple queries requiring a consistent view of the state are served by actors themselves from their in-memory state.
  2. Complex queries (requiring data aggregation) tolerant to relaxed consistency are served by Elasticsearch.
The in-memory state can be considered a simple form of a read store. This architecture is called MemoryImage, in which the current state is kept in and queried from main memory. Naturally, your current state has to fit into memory.

The benefits of an in-memory image are as follows:
  • you don’t need to keep the application state in an up-to-date persistent store, and keep them in sync
    • in case of an application crash, the in-memory state gets restored from the journal 
  • you get high performance, since queries are served from in-memory state with no IO or remote calls to database systems
  • you get rid of database mapping code

Failures

A persistent actor gets unconditionally stopped in case:
  • persistence of an event fails
  • state recovery during actor startup fails
This is done in order to to avoid data inconsistency occurring.

This means that Akka supervision hierarchy you are familiar with no longer works as expected - the actor has to be restarted not only after a crash, but also after a stop.

Additionally, in case the problem lies in database connectivity, it would be reasonable to wait for a bit prior to re-creating the actor (and retrying recovery) to avoid the "thundering herd" problem.


In order to handle all of the above, consider the following explicit supervisor:



// restart child actors on all kinds of java.lang.Throwable, not just non-fatal kinds of java.lang.Exception 
def supervisorStrategy = OneForOneStrategy()(SupervisorStrategy.defaultDecider.orElse {
  case NonFatal(_) => SupervisorStrategy.Restart
})


val backoffSupervisorProps = BackoffSupervisor.props(
  Backoff.onStop(
    actorProps, // your wrapped (supervised) actor props
    childName = actorName + "Supervised",
    minBackoff = 3.seconds,
    maxBackoff = 10.seconds,
    randomFactor = 0.2)
    .withSupervisorStrategy(supervisorStrategy))

context.actorOf(backoffSupervisorProps, actorName) 

Created this way, an actor would get restarted according to the specified supervision strategy after a crash, and it would also get restarted after having been stopped by the Akka Persistence framework, after a backoff period.


Serialization

By default, Java serialization is used by Akka Persistence when storing your events. This is not advised mainly because it does not handle schema evolution well (and it is rumored to be quite slow). It’s best to serialize events as generic data structures such as maps and lists, with as little class information as possible.
We serialize our events to JSON because:


  • JSON makes it easier to handle schema evolution. For example, removing fields or adding fields that have a default value is basically free.
  • it's human-readable (easier to debug)  
  • we use the same serialization engine for both persisting events to Cassandra and publishing events to Elasticsearch  
  • no vendor lock-in (unlike Kryo or Scala Pickling)
  • using spray-json + spray-json-shapeless allows to have a minimum of boilerplate code (no schema needed, a lot of built-in support for scala types, whole type hierarchies automatically get serialized by the json-shapeless library)



However there is a price to pay: performance. For example, JSON serialization is a major CPU consumer during recovery:


When serializing your state for snapshots, Java serialization is also used by default. Here it might be reasonable to just use the fastest serialization available, because in case the types of your internal state change, the snapshot could be simply dropped and rebuilt from events by replaying the whole stream again.


Persistent FSM

In case you wish your persistent actors to process commands in a FSM like fashion, Akka provides the PersistentFSM API.

Consider an actor representing a digital lock that can be unlocked by entering a combination of digits. The lock accepts the following commands (not persisted): 



sealed trait Command 
case object Lock extends Command 
case class EnterDigit(digit: Digit) extends Command 
case object TryUnlock extends Command

The lock can be in one of the following states (persisted):
sealed trait LockState extends FSMState
case object Locked extends LockState { 
override def identifier: String = "Locked" 
}

case object Unlocked extends LockState { 
override def identifier: String = "Unlocked"
}

Any actions performed with the lock are recorded as a sequence of "domain events", which are persisted. Those events are replayed on an actor's startup in order to restore the latest state:



// note the past tense
sealed trait DomainEvent
case object ComboCleared extends DomainEvent
case class DigitEntered(digit: Digit) extends DomainEvent
case class IntrusionDetected(data: Seq[Digit]) extends DomainEvent

The lock state represents the sequence of currently entered digits (not persisted, reconstructed on recovery): 
Seq[Digit]

The following DSL binds everything up into an FSM:


startWith(Unlocked, Seq())
when(Unlocked) {
case Event(Lock, _) ⇒
 // persist only the state
 goto(Locked) andThen {
   // perform a side effect
   case _ => reportActor ! "the lock is secure"
 }
}
when(Locked) {
case Event(EnterDigit(digit), currentCombo) ⇒
 // persist the Selected event only
 stay applying DigitEntered(digit)
case Event(TryUnlock, currentCombo) ⇒
 if (currentCombo == SECRET_COMBO) {
  goto(Unlocked) applying ComboCleared replying "welcome to the realm"
 } else {
   // persist the Off event only
   stay applying(ComboCleared, IntrusionDetected(currentCombo))
 }

}

The following function constructs the state data from events:


override def applyEvent(event: DomainEvent, stateBeforeEvent: Seq[Digit]): Seq[Digit] = {
event match {
 case ComboCleared => Seq()
 case DigitEntered(digit: Digit) => stateBeforeEvent :+ digit
 case IntrusionDetected(_) =>
   stateBeforeEvent // persist for audit purposes only
}
}

During recovery, the actor state is recovered automatically, and state data is recovered by repeatedly calling the applyEvent function above for each event persisted.

We’ve experienced issues with the API though:


  • Built-in state forMax()  timers have a bug (#19688) - they work incorrectly on recovery. Use explicit timers as a work-around.
  • We have not found a way to use snapshots for FSM, so FSM actors should not have long journals.
  • No built-in analogue for the ‘at least once delivery guarantee’ kind of API discussed above
  • No support for transient states (e.g. states to which you don’t want to recover).


Persistent actor in a cluster

A persistent actor must be a cluster singleton (unique persistenceId per cluster). At all times, only one actor should write events into a journal. If you have a cluster, but all your persistent actors fit into memory of the same machine, you can use the ClusterSingleton manager that would do its best to ensure there is only one instance of the persistent actor, located on the oldest node.
https://documents.lucidchart.com/documents/ebec7350-2d39-4c75-ae4b-55ac13bf6f6a/pages/0_0?a=126&x=138&y=161&w=924&h=418&store=1&accept=image%2F*&auth=LCA%201444f2f8110a54f6707c98fb07aba6572f09e7dd-ts%3D1458838848
If your persistent actors do not fit onto the same JVM, you can use ClusterSharding manager to spread persistent actors evenly across the cluster. https://documents.lucidchart.com/documents/ebec7350-2d39-4c75-ae4b-55ac13bf6f6a/pages/0_0?a=130&x=138&y=161&w=924&h=418&store=1&accept=image%2F*&auth=LCA%20c3c310b4d3f55dcf626b2584659087fa4396b11a-ts%3D1458838848

Performance considerations

An application using Akka Persistent actors has been tested on the following setup:
  • Storage (Journal/Snapshots): Cassandra 3.3 - m3.medium (AWS) - 3-node cluster
  • Application: Akka 2.4.2, Play 2.4 - c4.large (AWS)
  • Total number of journaled events: ~1.7M (result of ~1.5 day under 200 req/sec load)
The table below depicts the effects of Akka Persistence on normal system functioning. In short, event persistence has little impact, but the more often snapshots are written, the higher the resource consumption:

CPU, %
Transmit, KB/s
Avg GC pause, ms
Nothing gets persisted
79
185
3..4
Events get persisted, but no snapshots
79
195
6
Snapshots every 10K events
~81
200
5
Snapshots every 200 events
~84
320
8
The table below depicts the effects of Akka Persistence on actor recovery. In short, snapshot frequency directly translates to actor recovery time improvements:
Recovery time
Total events recovered
No snapshots
5m 21s
1700K
Every 10K events
36s
83K
Every 200 events
11s
12K
To summarize: in our particular application, event persistence is lightweight, but snapshotting is not.
However, snapshots do significantly reduce recovery times.