20 December 2018

Removing the pesky Caps Lock delay on macOS Mojave

You may have noticed that in order to engage Caps Lock on your Apple computer, you have to hold the button down for about 100 milliseconds; shorter keypresses are just ignored.

Personally, I type fast, and my keypresses are all equally brief, so in my case this "feature" causes Caps Lock mode to almost never get activated as expected, causing frustration.

Apple does not seem to provide any means to configure this delay.

Third-party solutions do exist, and I've been using Karabiner-Elements for a long time, which had a handy option to completely remove the said delay - until version 12, that is.

Here is what the author has to say about Karabiner-Elements no longer offering the option:

Since v12.0.0, Karabiner-Elements's virtual keyboard acts like the real hardware keyboard. Thus, macOS ignores the short press on caps lock.

However, Karabiner-Elements still offers a way: it can hold the Caps Lock button for you, so you don't have to :) That is, a brief physical keypress will turn into a longer "virtual" one automagically.

Here is how.

First, install Karabiner-Elements and upgrade it to at least version 12.1.57 (which is beta as of the time of writing). In order to upgrade to a beta version, run Karabiner-Elements, open its Preferences screen and use the "Check for beta updates" option:


Now we need to create a "Complex Modifications" rule that would map the caps_lock key to itself, but being artificially held down for 100 milliseconds.

Create a text file with content posted below, and place it somewhere locally on your computer - for example, /tmp/remove_caps_lock_delay.json
Here is the text file content:


Now, open the following link in your Safari browser (make sure to replace /tmp/remove_caps_lock_delay.json if you have the file at another location):
karabiner://karabiner/assets/complex_modifications/import?url=file:///tmp/remove_caps_lock_delay.json

You will be prompted to import the rule into Karabiner-Elements.

After a successful import, open Karabiner-Elements Preferences screen again, visit the "Complex Modifications" tab, press the "Add rule" button, and select "Enable" to the right of our newly imported "Remove Caps Lock delay" rule:

To make the feature stable, set all timeouts to 100 milliseconds at the "Complex Modifications" -> "Parameters" screen:


That is all there is to it. No matter how briefly you press Caps Lock, Karabiner will "hold" it down long enough for macOS to recognize the keypress.

05 August 2017

TitanDB: why doesn't my index work ?

The setting

One of our customer's products is a Play2 application written in Scala on top of Lightbend Reactive Platform. The application takes advantage of a graph database – TitanDB 1.0.0 in embedded mode, backed by Cassandra.

TitanDB allows fast and convenient queries across entities bound by relations (e.g. social networks).

The application uses Titan's Java API to manipulate and query the graph.

The problem

At some point, we started noticing that certain queries were running really sluggishly – this was accompanied by Titan log messages such as this one:

Query requires iterating over all vertices [(name = user001 AND ~label = agent)]. For better performance, use indexes

This was strange, as our application did check that the necessary indexes existed – and did create them if they weren't – at its start-up. The code looked something like this:




It was a surprise: a query that was expected to use an index actually did not. For the purposes of testing, I have manually ran a hand-crafted query that was definitely supposed to use that index and ... got the same result. Interesting.

Scanning through index documentation did not shed more light (well, there actually were hints at my problem on that page, but I've missed them then). So what do software developers do when RTFM doesn't help? Right, Google it. I found a number of articles on StackOverflow, and I realized that TitanDB indexes had a lifecycle. I've used the 'RTFS' magical trick (read those sources :)) to get help:

    INSTALLED - The index is installed in the system but not yet registered with all instances in the cluster
    REGISTERED - The index is registered with all instances in the cluster but not (yet) enabled
    ENABLED - The index is enabled and in use
    DISABLED - The index is disabled and no longer in use

So it's possible our indexes weren't enabled. The next step was to understand the statuses of our indexes – this can be done via the Gremlin console (there is also a programmatic way to do that from Scala):

mgmt = g.getGraph().openManagement(); 
names = [ "CompositeNameSalesperson", "CompositeNameAgent" ]; // <-- all indexes are here, actually
res = names.collect { [ it , mgmt.getGraphIndex(it).getIndexStatus(mgmt.getPropertyKey('name')) ] };
mgmt.commit();
res

so we got something like this:

[
    "[CompositeNameSalesperson, ENABLED]",
    "[CompositeNameAgent, INSTALLED]"
]

Well-well, one of indexes is not enabled (and effectively is not used). And this is a new type of vertex, added few days ago.

The fix

Let's postpone finding the reason for the index being stuck in this INSTALLED lifecycle for a while. First order of business, I needed to repair the index by promoting it into the ENABLED state.

Since my index is INSTALLED, the next state should be REGISTERED. Let's go:

mgmt = g.getGraph().openManagement(); 
mgmt.updateIndex(mgmt.getGraphIndex("CompositeNameAgent"), com.thinkaurelius.titan.core.schema.SchemaAction.REGISTER_INDEX); 
mgmt.commit();


Now I checked the state of the index again:

mgmt = g.getGraph().openManagement(); 
names = [ "CompositeNameAgent" ]; 
res = names.collect { [ it , mgmt.getGraphIndex(it).getIndexStatus(mgmt.getPropertyKey('name')) ] };
mgmt.commit();
res

and got:

[
    "[CompositeNameAgent, INSTALLED]"
]


What ?! Why? Turned verbose logging on and I got:

Some key(s) on index CompositeNameAgent do not currently have status REGISTERED: name=INSTALLED

The next iteration of Seq(documentation, Google, StackOverlow).foreach(_.read) has yielded the following list of possible reasons due to which changing the state of the index might fail:
- open transactions
- stalled Titan instances

I was pretty sure nobody used Titan at that moment, so I went the second path to investigate. Hopefully, the Titan API can respond with enough information:

mgmt = g.getGraph().openManagement(); 
y = mgmt.getOpenInstances();
mgmt.commit();
y;

and the response is:

[ac1100041-hostname(current), ac1100041-hostname, ac1100021-hostname]

Hmm, so why are there 3 instances if I knew that only one application instance was running? Let's consult the documentation: "... TitanFactory can also be used to open an embedded Titan graph instance from within a JVM-based user application ..." – this would explain the existence of 2 instances (we're a using the factory twice) but what about the 3rd one?

Continuing reading the documentation about failures I got at my scenario:
However, some schema related operations - such as installing indexes - require the coordination of all Titan instances. For this reason, Titan maintains a record of all running instances. If an instance fails, i.e. is not properly shut down, Titan considers it to be active and expects its participation in cluster-wide operations which subsequently fail because this instances did not participate in or did not acknowledge the operation. 
In this case, the user must manually remove the failed instance record from the cluster and then retry the operation.

Bingo! That's explains the reason for our inability to change the status of the index. The "ac1100021-hostname" instance seems to be the black sheep here. Let's get rid of it:

toRemove = ["ac1100021-hostname"];
mgmt = g.getGraph().openManagement();  
x = mgmt.getOpenInstances();  
toRemove.collect{  mgmt.forceCloseInstance(it) };
mgmt.commit();   
y = mgmt.getOpenInstances();   
[x, y]; 


that operation returned:

[
    "[ac1100041-hostname(current), ac1100041-hostname, ac1100021-hostname]",
    "[ac1100041-hostname(current), ac1100041-hostname]"
]


Now I have repeated the operations stated at the begging of this chapter and I got:

[
    "[CompositeNameAgent, REGISTERED]"
]

I did that! Finally let's enable index...

mgmt = g.getGraph().openManagement(); 
mgmt.updateIndex(mgmt.getGraphIndex("CompositeNameAgent"), com.thinkaurelius.titan.core.schema.SchemaAction.ENABLE_INDEX); 
mgmt.commit();

...and verify the result:

mgmt = g.getGraph().openManagement(); 
names = [ "CompositeNameAgent" ]; 
res = names.collect { [ it , mgmt.getGraphIndex(it).getIndexStatus(mgmt.getPropertyKey('name')) ] };
mgmt.commit();
res


we finally get:

[CompositeNameAgent, ENABLED]

I've checked the original query (which was suppose to use the index) - and it ran as fast as other queries. Problem solved! But wait a minute! Game is not over.

Root causes

So why did the index get stuck in the 'INSTALLED' state in the first place? (also I recall seeing some of the indexes being stuck in the 'REGISTERED' state)

I end up with 2 possible reasons:

a. In case an application instance (with an embedded Titan instance) crashes, it remains intact in Titan bookkeeping as a cluster member – this dead instance prevents index state from progressing, as it can not be communicated with. The following log records can be observed in this case:

Some key(s) on index CompositeNameAgent do not currently have status ENABLED: name=REGISTERED
...
Timed out (PT1M) while waiting for index CompositeNameAgent to converge on status ENABLED

An advice: make sure there are no dangling Titan cluster members - clear them our prior to creating indexes (or doing any other cluster-wide operation).

b. An issue with the code which creates indexes:

          val index = management.buildIndex(indexName, classOf[Vertex]).addKey(key).indexOnly(label).unique().buildCompositeIndex()
          management.setConsistency(index, ConsistencyModifier.LOCK)

Reading through Titan documentation and StackOverflow articles, I have found that this call returns as soon as the index has entered the INSTALLED state. To be sure that the index has actually entered the ENABLED state (and hence is actually ready to be used by queries), it is required to wait for index creation completion outside the transaction where it was created, and make sure no other transactions bother us. So the index setup code (see the beginning of the article) was changed to:



31 July 2017

Indoor shooting range: automated billing and admissions

In this blog entry we will share a business case of Digital Magic helping a business evolve a fledgling idea into a mature product.


The setting

A while back a person has decided to establish an indoor shooting range business. His plan was to refurbish an abandoned factory building and hire an administrator who would deal with shooting range customer admissions.


The problem

Selling bullets or cartridges, it is easy to ensure that every item has been accounted for by comparing the physical state of ammo inventory to the amount of cash the company account has received since the last inventory. This is not so with "shooting time", which is not easily tangible.

Simply put, the business owner wanted to ensure that shooting time is correctly inventoried.


The concept

The business owner came up with an idea – separating the two conflicting concerns out:
  • admissions sales would be handled by the administrator
  • physical access to a shooting lane and customer billing would be handled completely by an automated system
The business owner then approached us – the Digital Magic team – with this idea and asked to help make it real.


The solution

We sat together with the business owner, analysing the idea, crystallising it into a vision of an actual product.

One of the fundamental questions was the identification of a customer account with which to associate credit. Having discarded technologies such as loyalty cards, RFID stamps, optical face recognition and mobile phone-based IDs, we have converged at customer identification based on smart cards. To be more precise, Estonian ID cards (as the business resides in Estonia, where every resident possesses such a card).

The solution was then straightforward:
  • each shooting lane would be equipped with:
    • its own lighting
    • a rail running along the ceiling allowing the target to be brought closer to or further away from the firing point
    • an electric motor controlled by a joystick that would actually move the target
    • a smart card reader
  • initially, all shooting lanes would have their lights off, and their targets brought close to the firing point, which renders shooting impossible (or at least no fun :))
  • a customer would enter the shooting range and buy a certain time allowance from the administrator
  • using a management console, the administrator would associate the credit with the customer’s account, based on his smart card ID number
  • the client would then approach a shooting lane and insert his ID smart card into that lane’s card reader
  • the system would turn the lane’s lights on, and power the electric motor joystick on so that the customer could move the target as far away from the firing point as he wishes
  • the system would track the time during which the smart card resides within the card reader
  • as soon as credit runs out (or the card gets removed), the system would turn the lane’s lights off and retract the target close to the firing range, powering the joystick down, effectively denying the customer the ability to shoot.

The design

The business owner took care of installing high-current electrics and devices: each shooting lane was now equipped with lighting, electrical motors shifting the shooting target along the rail, and motor control joysticks - all connected to a power source. Each lane could now individually be powered up or down by simply closing the corresponding pair of wires.

Actual activation and deactivation of each lane was now up to an intelligent system, which - both hardware and software - was to be designed, implemented and installed by Digital Magic.

The diagram below depicts the overall design:

Hardware

First order of business was to choose a controller that would become the bridge between the software customer account management system and the high-current hardware lane controls.

We have chosen the Laurent-112 Ethernet network relay, which has the ability to:
  • control all 7 shooting lanes (there are 12 relays on the PCB)
  • connect to Ethernet and expose a HTTP server with a simple API
The next piece of hardware were smart card readers - we just used generic USB readers. One challenge was that the firing points were some distance away from the server room, so each reader USB cable was extended with 20+ meter long extension cables each featuring a signal repeater.

The server running the actual management application is a simple office PC running Ubuntu, too boring for its specs to be listed :) One requirement for the PC, though, was having at least 7 USB ports - one for each shooting lane. We have also tried connecting all readers via an USB hub with success - the server could still discern individual card readers - but eventually settled on the simpler hub-less solution.


Software

The application server is the brain behind the operation.

It is an application based on Play and Akka Persistence - you can read more about the latter technology in this blog post, however the choice of frameworks did not matter much for this business case, as the project was small-scale.

The application server is hooked up as follows:
  • 7 smart card readers (installed at each shooting lane's firing point) are connected directly to the PC's 7 USB ports.
    • Ubuntu gives each USB port a name; server configuration binds each port name to its shooting lane.
  • the PC is connected to the Laurent-112 controller via Ethernet.
    • The controller can be sent simple commands via HTTP using a proprietary protocol to:
      • close a relay, activating a particular shooting lane
      • open a relay, turning that lane's lights off
      • check relay status
    • Server configuration binds each relay to its shooting lane.
The application server's main responsibility is listening to javax.smartcardio smart card events:
  • whenever an ID card is inserted into the card reader of shooting lane N, the customer account associated with the card is checked for credit availability
    • if there is enough credit, a "close relay N" command is sent to the Laurent-112 controller, activating the corresponding lane
    • the system starts tracking passage of time with 1-second resolution for that lane.
    • as soon as customer's credit gets depleted, the application disengages the shooting lane.
  • whenever an ID card is removed from the card reader of shooting lane N, an "open relay N" command is sent to the Laurent-112 controller, deactivating the corresponding lane.
The application server also exposes the following functionality via WEB-based management consoles:
  • a WEB UI for the shooting range administrator
    • associate credit with a customer's account
  • a WEB UI for the business owner
    • create and delete administrator accounts
    • create tariff plans
    • request billing reports:
      • hourly customer activity reports
      • summary of customer payments during a time period

The bottom line

Using billing reports generated by the system, the business owner was now able to easily inventory shooting time by comparing banking account cash flows to the amount of credit sold as registered by the system.

Problem solved.


Application server frameworks

  • Play 2
  • Akka Persistence + LevelDB backend (very light loads)
  • javax.smartcardio to interact with smart cards
  • written in Scala

Issues we've encountered

On a rare occasion, e.g. sometimes upon Ubuntu server reboot, the USB port names would change (so that e.g. the first shooting lane's card reader, which was previously named OMNIKEY CardMan 1021 01 00, would now be named e.g. OMNIKEY CardMan 1021 06 00). The occasions were so rare that the business owner had no trouble fixing them by re-mapping ports via the WEB management console.

In order to correctly handle application crashes or application server hardware failures, the Laurent-112 PCB needs to be configured to periodically ping the application to sense outages - in such cases, the PCB would open all relays, deactivating all shooting lanes, as by this moment the main application has lost the ability to track credit. At this particular installation, we have not done so, but it is perfectly possible by e.g. running the application as a Docker container with a heartbeat port exposed. Any kind of a crash would render the port inaccessible.


Ideas for the future

Additional reports, such as a heat map report, which would help answering questions such as:
  • which are peak customer activity hours?
  • which are the best business hours?
  • should additional shooting lanes be built?
Also, the list of services offered by the automated system could be extended to selling not just shooting time, but also guns, ammo and targets, each with their automatically calculated prices based on tariffs, client track record, campaigns, etc.

14 April 2016

Practical Akka Persistence

Having used Akka Persistence in a live project, we at Digital Magic have decided to write our experience down as a blog post. Here you'll find practical advice that would help you start using the framework, side-stepping the rough edges.

Familiarity with the actor model and its Akka implementation are assumed. Familiarity with Scala would help.



What does Akka Persistence get you

Akka actors lose their internal state when stopped. Quite simply, the Akka Persistence framework enables stateful actors to:
  • persist their state to a durable store of your choice
  • recover actor state from a persistent store at actor startup
For example, you can model shopping cart entities as stateful actors. In case your application crashes and gets restarted, the in-memory state of your shopping cart actors would automatically be restored by the framework to the pre-crash state; the users of the system would keep shopping as if nothing has happened.

A little theory first.

Event Sourcing

Traditionally, an application keeps a snapshot of its current state in a database. Suppose the following actions have been performed by a system user:


  • a group G was created
  • a couple of users A and B were created
  • user A was added to the group
  • user B was added to the group
  • user A was removed from the group

The resulting state (in e.g. RDBMS) looks like follows:

https://documents.lucidchart.com/documents/ebec7350-2d39-4c75-ae4b-55ac13bf6f6a/pages/TEt6q_3QyzB1?a=209&x=143&y=68&w=374&h=264&store=1&accept=image%2F*&auth=LCA%204c2c13fb4ef259b5d19956ae0a74b4058e84a052-ts%3D1458838848

The history of events that has led to this particular state is not recorded. Was user A ever a member of the group?

With event sourcing, instead of saving current application state, a sequential log of incremental changes to the state is stored. Each state change is captured in an event - an immutable fact of the past, something that has already happened. The event log is append-only, with no events ever updated or removed.

In order to reconstruct the current application state, all past events are replayed in their original order, and sequentially applied to the working state using a deterministic function {old state, event} => {new state}.

A familiar example of a system based on event sourcing is a version control system. Every repository change is captured as a commit, and you rebuild the current state of the code base by replaying all the commits into an empty directory.

Event sourcing can also be found in real world businesses – e.g. doctors gradually appending data to your medical profile, or formal contracts being updated by the means of addendums (not striking words out from the original contract).

Advantages of event sourcing

  • With Event Sourcing, you have a complete log of every change ever applied to the system. This opens up business opportunities such as:
    • Additional data insight. Data about your business is value. For example, in case some day in the future you decide it would be opportune to know the set of items your customers remove from their shopping carts right prior to checking out, you would be capable of having such a report retrospectively, not just "from that day forward".
    • A complete audit trail - for free.
  • Excellent write performance characteristics due to "append-only" write pattern.
  • Easy to scale the write store up because there is no write contention or locking.
  • Easy to reproduce issues for debugging purposes – just replay the event log up to a particular point in time and debug from there.
    • The stream of events up to the point of failure might even become your test case setup.

Drawbacks of event sourcing

One drawback is an increased storage capacity requirement, as data gets never deleted. But data is value, so one might actually argue this is a drawback at all.However, the biggest problem is that with event sourcing, it is troublesome to query data.
  • All you have is an event log. How do you answer queries such as "is user B a member of group G"? Replaying the complete log each time in order to answer a query would be very sad, indeed.
  • How do you answer queries that need data to be aggregated from several event logs?
In order to combat the log querying problems, let's take advantage of the idea of CQRS, discussed next.


CQRS

CQRS stands for Command and Query Responsibility Segregation. The idea is quite simple: you can have the same data, but different data models for reads and writes. Turn the log of events into a model more suitable for querying by projecting event store(s) into query store(s).

CQRS can be depicted as:


Advantages of CQRS


First and foremost, CQRS enables event sourcing to be practically applicable, by providing a way for data to be queried.
An important observation is that a single data model simply can not be equally appropriate for all kinds of queries as well as for writes at the same time. Trying to have a single data model fit everywhere adds inherent complexity to the system.
With CQRS, you can optimize the write and query stores independently. You might want to store the event log in a data store optimized for sequential append-only writes (such as Cassandra), while having your audit trail reside in Elasticsearch specializing in full-text search, still having your documents served by MongoDB.

You might project portions of your data into a relational database and have the tables optimized for queries by giving up normal forms, not worrying about data duplication or data inconsistencies because there is only one system of record - the event log, from which the query stores may be rebuilt at any time.

Another benefit is that you can scale your read stores independently of write stores. It is typical for queries to happen orders of magnitude more often than writes.

Drawbacks of CQRS


The query side has to be prepared that their requests will be served with relaxed (eventual) consistency. This is due to that the projection of the write store to the query store is typically non-instantaneous. However, read on about serving simple queries in a consistent manner from an in-memory state.
CQRS also implies increased storage capacity requirements, as the same data resides in several data stores (and probably in non-normal forms, as well).

Akka Persistence

In short, the Akka Persistence framework is an implementation of Event Sourcing and CQRS on top of Akka.

Akka Persistence comprises of the following components:
  • Persistent Actor
  • Event Journal
  • Snapshot Store
  • Akka Persistence Query (experimental API as of Akka 2.4)
  • At-least-once delivery
  • Persistent FSM (experimental API as of Akka 2.4)
Let's cover each of these separately.

Persistent Actor


If you wish your entity actor's state to be protected from loss at stop or crash, and recovered at actor startup, use the Persistent Actor interface.

The key concepts around the interface are:

  • Commands - these are imperative instructions for the actor to do something, they are plain vanilla Akka messages. A command may or may not result in a state change.
  • Events - the same concept as discussed in the Event Sourcing section. Events capture state changes, and are immutable facts of the past.
  • State - the working state of an actor is the sum of sequential application of all the events, using a deterministic application function.
Here is an example of the concepts bound together:


Suppose a user management actor is commanded to grant the administrator role to the user identified by rob@corp.com. The actor first checks whether such a user exists, whether he already has such a role, whether the acting user has enough rights to grant roles to other users, etc... In case all such checks pass, an event is created (note the past tense "role has been added"). In order to update the actor state (so that it would reflect the effects of the event), the deterministic event application function {old state, event} => {new state} is called as displayed above.

Event Journal


This concept is the same as was discussed in the Event Sourcing section. An event journal is an append-only immutable log of events. Each persistent actor has exactly one journal, identified by a persistenceId, and you must ensure that no two actors within a cluster have the same persistenceId. Persistent actors have to be cluster singletons.

The event journal can be backed by a selection of data stores, and there is a number of community plugins for RDBMS, NoSQL, document stores, etc... Cassandra seems to be a good fit for the event journal backing store, because it is optimized for writes.


Let's consider an example. Below is a simple example of a domain model:



object Domain {
 // Commands – the external stimuli, these do not get persisted
 sealed trait Command
 case class Deposit(amount: Int) extends Command
 case class Withdraw(amount: Int) extends Command

 // Events - reflect effects that have already happened
 sealed trait Event
 case class BalanceChanged(delta: Int) extends Event

 // State - reflection of a series of events
 case class Account(balance: Int) {

// the deterministic event application function    def apply(event: Event) = event match {

     require(balance >= 0)
     case BalanceChanged(delta) =>
       Account(balance + delta)
   }
 }
}



Actors who would like their state persisted have to inherit the PersistentActor trait:

class Account(val id: String) extends PersistentActor {
 // this identifies the persistent journal and

// does not change between actor incarnations
 override val persistenceId = s"Account.$id"

 var state = Domain.Account(balance = 0)

 // receives events during actor startup
 override def receiveRecover: Receive = {
   case ev: Event =>
     state = state.apply(ev)
 }

 // receives commands during normal operation
 override def receiveCommand: Receive = {
   case Withdraw(amount) =>
     if (state.balance < amount) {
       sender() ! "not enough funds"
     } else {
       // the callback is asynchronous, but Akka guarantees

       // is safe to access and update the state
       persist(BalanceChanged(-amount)) { changed =>
         // the only safe place to apply state mutation caused by the event
         state = state.apply(changed)
         sender() ! s"successfully withdrawn $amount"
       }

       // WRONG: state.getBalance is in unpredictable state
       // sender() ! state.getBalance
     }
 }
}



About actor state


The Akka persistence API does not specify the way an actor’s state is to be kept. A good choice is to keep an actor’s state in a single variable, because:
  • when you start using snapshots it’d be easier to write the whole state in one shot
  • it’s easy to forget to update something if state is scattered over a dozen var variables or behaviour parameters
It’s a good idea to split an actor state into two parts: persistent and transient. The transient part must be reconstructable from the persistent part at any moment. An example is a dictionary of something (id -> object) and a reverse index (object -> id). During recovery, it is advised to reconstruct transient state only after recovery completion, to save up on computational resources.



Snapshot Store


State snapshots are completely optional. A snapshot captures an actor’s current internal state. Snapshots can periodically be written out to bound recovery times of long journals.

For example, suppose an actors has written 3 events, then a snapshot of the state reflecting those 3 events, and then two more events:


In this case, upon actor startup, state recovery starts off with the latest snapshot, not an empty state, so only the events 4 and 5 will have to be read and applied.



Generally, here's how an actor is started up:


  • Run constructor, set initial state to empty
  • Receive latest snapshot, if any, and replace the initial state
  • Reconstruct the current actor state:
    • if there was a snapshot, apply only those events coming after it
    • if there are no snapshots, apply all events
    • in both cases, the resulting state is the same
  • Any incoming messages are held (stashed) until recovery completion

There is, of course, also an API for an actor to save snapshots of its current state, and receive a snapshot during recovery. The framework does not enforce you any rules for when and how to write snapshots, it only provides an API:

override def receive: Receive = {
case cmd: Command =>
  persist(Event.from(cmd)) { ev =>
    // change the state now that the event is persisted
    state = state.process(ev)
    if (SnapshotHelper.isTimeToSnapshot) {
      // trigger a snapshot write
      self ! SaveSnapshot
    }
  }
case SaveSnapshot =>
  saveSnapshot(state)
case SaveSnapshotSuccess(metadata) =>
  // retain only the latest snapshot to ease up the database
  val criteria = SnapshotSelectionCriteria(maxSequenceNr = metadata.sequenceNr - 1)
  deleteSnapshots(criteria)
case SaveSnapshotFailure(metadata, reason) =>
  // not fatal - just log an error
}

Note that you can also maintain a journal of commands (in addition to saving events), but this is optional – it can be useful for getting a sense of how the system is used, and for debugging or testing.

The snapshot store can be backed by a selection of data stores, and there is a number of community plugins.

Persistence Query


Akka Persistence Query is what implements the query side of CQRS. The API provides a stream of journaled events (an Akka Streams stream). The data stream is transformed and redirected to the query store for querying later.

Note that there still is a single system of record - the event log. Query stores can be deleted and re-populated at any time with no data loss.
Note here that the same event journal can be projected to several query stores, and several event journals can be projected to the same query store.

Let's consider a practical example. We want to be able to query an audit trail of all modifications done to our system. The easiest way is to stream all events to an Elasticsearch index, so that the index could later be queried using Kibana.

Note that Elasticsearch may reject or lose writes (due to e.g. field mapping conflicts or network partitioning), but these durability issues do not mean we should avoid using it – just don’t use it as a primary DB, and have the index rebuilt in case of its corruption. A perfect task for Akka Persistence Query, which is going to facilitate the projection of our events to Elasticsearch.

Akka Persistence Query itself is not the query side of an application (it does not serve queries) – it is merely a transport facilitating migration of events from the write store (Cassandra in our case) to the actual query store (Elasticsearch in our case).

Read journal implementations typically have an API to provide a typed stream of events, which can then be transformed by Akka Streams combinators (to e.g. publish events to the query store).

Let’s examine a complete example of an event publisher to Elasticsearch. The publisher uses Akka Persistence Query to evoke an Akka Streams stream of events from many event logs, but it also is a PersistentActor itself because it needs to implement a resumable projection (memorize the last event published to start where it has left off the last time).



class Publisher extends PersistentActor {

 // memorize the last offset written
 var state: Option[UUID] = None

 def engageStreaming = {

   // by default the stream will terminate upon all exceptions
   val readJournal = PersistenceQuery(context.system)
     // obtain a read journal
     .readJournalFor[scaladsl.CassandraReadJournal]("cassandra-query-journal")

   // start streaming from the last offset written
   val initialUuid = state.getOrElse(readJournal.firstOffset)

   // events to be published have to be tagged with "publish_me"
   // using an Event Adapter during the write flow

   // eventsByTag get you an endless stream of typed events.

   readJournal.eventsByTag("publish_me", initialUuid)
     // use Akka Streams combinator to publish 4 events in parallel
     .mapAsync(4) { envelope =>

       // the write to Elasticsearch has to be idempotent,
       // because a failed publishing attempt might be re-run.
       // achieve this by generating a stable id for this event
       val id = s"${envelope.persistenceId}.${envelope.sequenceNr}"
     
       val elasticFuture = envelope.event match {
         case e: Event => Future {
           // asynchronously write event e to Elasticsearch under id

           ElasticSearchWriter.write(e, id)
         }
       }
     
       // once the event is written, map to its offset
       elasticFuture.map(_ => envelope.offset)
     }
     .mapAsync(1) { offset =>
       // persist the 'last offset written' prior to moving forward,
       // handled by receiveCommand as in a typical PersistentActor.

       // note that the ask pattern is of type Future[Any]
       self ? SetLastOffsetWritten(offset)
     }
     .runWith(Sink.ignore)
     .recover { case e =>

       // handle error by shutting the actor down
       context.stop(self)
     }
 }
}

A few words regarding the publisher code above:

  • The publisher actor is a Persistent Actor itself - it memorizes the events that have already been streamed to and acknowledged by Elasticsearch, and does not stream them again (a resumable projection).
  • In case the publisher crashes with some events being in flight to/from Elasticsearch, those events would be re-sent after recovery. However, Elasticsearch document IDs are based on event sequence numbers that are stable and unique for each event, so the processing at Elasticsearch is effectively idempotent.
  • This means that events are delivered to Elasticsearch in effectively an exactly-once manner.

Note that not all journals support the eventsByTag call – without it, it is much more difficult to implement a resumable projection (although not impossible). For example, Cassandra journal 2.x does not support it, while Cassandra 3.x does.

Basically you can have as many different materialized projections as you want, and all of them can use different data models and different db engines. Most systems actually need to have more than one read model to work well.

You can even project a projection if you so choose.

As was mentioned above, an important note is that Read consistency is eventual. It takes a little while (seconds) for persisted events to get into the projected stream. Most of the time, queries can tolerate that, but not always. Where consistency is important, use the actor in-memory state to serve simple queries (for example, “check email and bind it to account if it is available”)

At least once message delivery


Standard Akka provides the following guarantees:
  • message delivery guarantee: at-most-once 
  • ordering guarantees: peer to peer 
Imagine that at the moment of an application crash, your actor was waiting for a response to a message it has sent to another actor earlier. During recovery, both actors would get caught up to their current state, but the request would not automatically be re-sent, and the sending actor would be waiting for a response that would never arrive.

Akka Persistence offers reliable message delivery API via the AtLeastOnceDelivery interface that takes care of re-sending messages when they have not been confirmed within a timeout.

The key concept here is that the sender persists the following facts (in form of events, of course):
  • message has been sent to recipient
  • message has been acknowledged by the recipient
During actor startup (e.g. after a crash), the framework will prompt the persistent actor to re-send any "sent" yet "non-acknowledged" messages.

A few words about the communication protocol:
  • The recipient has to send message acknowledgements.
  • The recipient has to:
    • process messages in an idempotent manner, or
    • have facilities to detect and handle duplicate messages
  • Message ordering is no longer guaranteed due to re-transmissions

An example system


Depicted below is an architecture taking advantage of most of things discussed so far.


Write journals are backed by Cassandra, as it is a data store optimized for writes.

A query store is backed by Elasticsearch, which gets populated by events from several event journals by means of Akka Persistence Queries.
  • Event log projection to Elasticsearch is resumable - the publisher memorizes the "last published" position within the events stream using Akka Persistence.
  • Message delivery to Elasticsearch is "exactly-once" due to stable document IDs and idempotent processing of them at Elasticsearch.
 There are two read models:
  1. Simple queries requiring a consistent view of the state are served by actors themselves from their in-memory state.
  2. Complex queries (requiring data aggregation) tolerant to relaxed consistency are served by Elasticsearch.
The in-memory state can be considered a simple form of a read store. This architecture is called MemoryImage, in which the current state is kept in and queried from main memory. Naturally, your current state has to fit into memory.

The benefits of an in-memory image are as follows:
  • you don’t need to keep the application state in an up-to-date persistent store, and keep them in sync
    • in case of an application crash, the in-memory state gets restored from the journal 
  • you get high performance, since queries are served from in-memory state with no IO or remote calls to database systems
  • you get rid of database mapping code

Failures

A persistent actor gets unconditionally stopped in case:
  • persistence of an event fails
  • state recovery during actor startup fails
This is done in order to to avoid data inconsistency occurring.

This means that Akka supervision hierarchy you are familiar with no longer works as expected - the actor has to be restarted not only after a crash, but also after a stop.

Additionally, in case the problem lies in database connectivity, it would be reasonable to wait for a bit prior to re-creating the actor (and retrying recovery) to avoid the "thundering herd" problem.


In order to handle all of the above, consider the following explicit supervisor:



// restart child actors on all kinds of java.lang.Throwable, not just non-fatal kinds of java.lang.Exception 
def supervisorStrategy = OneForOneStrategy()(SupervisorStrategy.defaultDecider.orElse {
  case NonFatal(_) => SupervisorStrategy.Restart
})


val backoffSupervisorProps = BackoffSupervisor.props(
  Backoff.onStop(
    actorProps, // your wrapped (supervised) actor props
    childName = actorName + "Supervised",
    minBackoff = 3.seconds,
    maxBackoff = 10.seconds,
    randomFactor = 0.2)
    .withSupervisorStrategy(supervisorStrategy))

context.actorOf(backoffSupervisorProps, actorName) 

Created this way, an actor would get restarted according to the specified supervision strategy after a crash, and it would also get restarted after having been stopped by the Akka Persistence framework, after a backoff period.


Serialization

By default, Java serialization is used by Akka Persistence when storing your events. This is not advised mainly because it does not handle schema evolution well (and it is rumored to be quite slow). It’s best to serialize events as generic data structures such as maps and lists, with as little class information as possible.
We serialize our events to JSON because:


  • JSON makes it easier to handle schema evolution. For example, removing fields or adding fields that have a default value is basically free.
  • it's human-readable (easier to debug)  
  • we use the same serialization engine for both persisting events to Cassandra and publishing events to Elasticsearch  
  • no vendor lock-in (unlike Kryo or Scala Pickling)
  • using spray-json + spray-json-shapeless allows to have a minimum of boilerplate code (no schema needed, a lot of built-in support for scala types, whole type hierarchies automatically get serialized by the json-shapeless library)



However there is a price to pay: performance. For example, JSON serialization is a major CPU consumer during recovery:


When serializing your state for snapshots, Java serialization is also used by default. Here it might be reasonable to just use the fastest serialization available, because in case the types of your internal state change, the snapshot could be simply dropped and rebuilt from events by replaying the whole stream again.


Persistent FSM

In case you wish your persistent actors to process commands in a FSM like fashion, Akka provides the PersistentFSM API.

Consider an actor representing a digital lock that can be unlocked by entering a combination of digits. The lock accepts the following commands (not persisted): 



sealed trait Command 
case object Lock extends Command 
case class EnterDigit(digit: Digit) extends Command 
case object TryUnlock extends Command

The lock can be in one of the following states (persisted):
sealed trait LockState extends FSMState
case object Locked extends LockState { 
override def identifier: String = "Locked" 
}

case object Unlocked extends LockState { 
override def identifier: String = "Unlocked"
}

Any actions performed with the lock are recorded as a sequence of "domain events", which are persisted. Those events are replayed on an actor's startup in order to restore the latest state:



// note the past tense
sealed trait DomainEvent
case object ComboCleared extends DomainEvent
case class DigitEntered(digit: Digit) extends DomainEvent
case class IntrusionDetected(data: Seq[Digit]) extends DomainEvent

The lock state represents the sequence of currently entered digits (not persisted, reconstructed on recovery): 
Seq[Digit]

The following DSL binds everything up into an FSM:


startWith(Unlocked, Seq())
when(Unlocked) {
case Event(Lock, _) ⇒
 // persist only the state
 goto(Locked) andThen {
   // perform a side effect
   case _ => reportActor ! "the lock is secure"
 }
}
when(Locked) {
case Event(EnterDigit(digit), currentCombo) ⇒
 // persist the Selected event only
 stay applying DigitEntered(digit)
case Event(TryUnlock, currentCombo) ⇒
 if (currentCombo == SECRET_COMBO) {
  goto(Unlocked) applying ComboCleared replying "welcome to the realm"
 } else {
   // persist the Off event only
   stay applying(ComboCleared, IntrusionDetected(currentCombo))
 }

}

The following function constructs the state data from events:


override def applyEvent(event: DomainEvent, stateBeforeEvent: Seq[Digit]): Seq[Digit] = {
event match {
 case ComboCleared => Seq()
 case DigitEntered(digit: Digit) => stateBeforeEvent :+ digit
 case IntrusionDetected(_) =>
   stateBeforeEvent // persist for audit purposes only
}
}

During recovery, the actor state is recovered automatically, and state data is recovered by repeatedly calling the applyEvent function above for each event persisted.

We’ve experienced issues with the API though:


  • Built-in state forMax()  timers have a bug (#19688) - they work incorrectly on recovery. Use explicit timers as a work-around.
  • We have not found a way to use snapshots for FSM, so FSM actors should not have long journals.
  • No built-in analogue for the ‘at least once delivery guarantee’ kind of API discussed above
  • No support for transient states (e.g. states to which you don’t want to recover).


Persistent actor in a cluster

A persistent actor must be a cluster singleton (unique persistenceId per cluster). At all times, only one actor should write events into a journal. If you have a cluster, but all your persistent actors fit into memory of the same machine, you can use the ClusterSingleton manager that would do its best to ensure there is only one instance of the persistent actor, located on the oldest node.
https://documents.lucidchart.com/documents/ebec7350-2d39-4c75-ae4b-55ac13bf6f6a/pages/0_0?a=126&x=138&y=161&w=924&h=418&store=1&accept=image%2F*&auth=LCA%201444f2f8110a54f6707c98fb07aba6572f09e7dd-ts%3D1458838848
If your persistent actors do not fit onto the same JVM, you can use ClusterSharding manager to spread persistent actors evenly across the cluster. https://documents.lucidchart.com/documents/ebec7350-2d39-4c75-ae4b-55ac13bf6f6a/pages/0_0?a=130&x=138&y=161&w=924&h=418&store=1&accept=image%2F*&auth=LCA%20c3c310b4d3f55dcf626b2584659087fa4396b11a-ts%3D1458838848

Performance considerations

An application using Akka Persistent actors has been tested on the following setup:
  • Storage (Journal/Snapshots): Cassandra 3.3 - m3.medium (AWS) - 3-node cluster
  • Application: Akka 2.4.2, Play 2.4 - c4.large (AWS)
  • Total number of journaled events: ~1.7M (result of ~1.5 day under 200 req/sec load)
The table below depicts the effects of Akka Persistence on normal system functioning. In short, event persistence has little impact, but the more often snapshots are written, the higher the resource consumption:

CPU, %
Transmit, KB/s
Avg GC pause, ms
Nothing gets persisted
79
185
3..4
Events get persisted, but no snapshots
79
195
6
Snapshots every 10K events
~81
200
5
Snapshots every 200 events
~84
320
8
The table below depicts the effects of Akka Persistence on actor recovery. In short, snapshot frequency directly translates to actor recovery time improvements:
Recovery time
Total events recovered
No snapshots
5m 21s
1700K
Every 10K events
36s
83K
Every 200 events
11s
12K
To summarize: in our particular application, event persistence is lightweight, but snapshotting is not.
However, snapshots do significantly reduce recovery times.