Reactive Streams and DynamoDB

Introduction

In this blog post, I'm going to demonstrate how Reactive Streams can allow us to elegantly work with DynamoDB, a key-value data store with a predetermined write capacity.

DynamoDB

DynamoDB is an Amazon managed NoSQL database. Amazon DynamoDB Documentation

The performance characteristics and client behaviour of DynamoDB are very different to traditional data stores (for example, databases). When working with a relational database, performance may gradually decrease as load on the database increases. DynamoDB tables have a configurable read and write capacity, specified as the number of reads / writes per second the table will accept. AWS charges based on the provisioned capacity. Exceed this limit and DynamoDB will reject the read / write. If using the AWS Java client, this failure will be represented as a ProvisionedThroughputExceededException.



The Job

Our process is a batch job which will write approximately 10 million records to a DynamoDB table. The source of the jobs is an Iterator[Property] (lets say we are writing documents with some information about Australian properties).

val sourcePropertyIter: Iterator[Property] = .... // how the source is obtained is not relevant to this discussion

Ideally the job will complete in under an hour, so while the job executes, we will configure the DynamoDB table with a write capacity of 3000. (10,000,000 records / 60 minutes / 60 seconds = 2777 which I’ve rounded up).

So before the job starts writing records, it invokes:

val writesPerSecond = 3000
val dynamoDbClient = new AmazonDynamoDBAsyncClient()
dynamoDbClient.updateTable(config.tableName, new ProvisionedThroughput().withWriteCapacityUnits(writesPerSecond))

And after the job completes, it we can reset the capacity to save on AWS costs again.

// Batch job magic goes here!

// Batch job has completed
dynamoDbClient.updateTable(config.tableName, new ProvisionedThroughput().withWriteCapacityUnits(1))

So what about that batch job magic in the middle?

A legacy Java implementation

A traditional Java implementation might try to read the records from the Iterator and hand them off to an ExecutorService backed by a ThreadPool. Those threads will attempt to process the jobs as fast as possible. The client may exceed the write capacity imposed by DynamoDB. When a client receives a ProvisionedThroughputExceededException, it will need to try again, after a short delay, typically implemented with Thread.sleep(someDelay). The AWS docs have some example Java code showing one strategy for Error Retries and Exponential Backoff in AWS - Amazon Web Services.

But this is horrible for a number of reasons:

  • Thread.sleep calls highlight under utilised Threads which could potentially be doing something useful (in this specific example, there is no impact)
  • Why are we throwing more requests at an API than we have configured it to accept?
  • What value do we use for the sleep to ensure that the client is still utilising the full write capacity?

Reactive streams

http://www.reactive-streams.org/

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.

Rather than handling failures because the job exceeded the write capacity, lets avoid those failures in the first place.

This example uses the Monix implementation of reactive streams.

Monix provides a Task data type for representing asynchronous computations. Read the amazing Monix docs to see the benefits over Scala Futures.

I've defined a function which is responsible for saving a single property to DynamoDB. There is some boilerplate to convert from a Java Future to a Monix Task, but I'll skip over that for now.

def saveProperty(property: Property): Task[Unit] = {
   someMagicForLiftingAJavaFutureIntoATask {
      val requestData = convertToJavaHashMap(property)
      dynamoDbClient.putItemAsync("invest-table", requestData)
   }
}

We can now define a Consumer which is responsible for writing multiple properties to DynamoDB concurrently.

val parallelConsumer = Consumer.foreachParallelAsync(20)(saveProperty)

The “20” is the maximum number of concurrent saveProperty calls.

And then configure the reactive stream (Observable) to only emit a maximum of 3000 requests per second.

val ingestTask = Observable.fromIterator(sourcePropertyIter)
.bufferTimedWithPressure(1 second, writesPerSecond)
.concatMap(Observable.fromIterable)
.runWith(parallelConsumer)

While this example used Monix, it is also worth noting that other reactive implementations expose similar functionality. For example, Akka streams provides a Throttle which can even be used to control throughput where each unit in the stream has a custom "cost" involved with processing.

Conclusion

Our data pipelines expressed in terms of Reactive Streams are elegant, concise and defined in a single place. This makes them very easy to reason about. When I compare this to our older Java codebases, the logic expressing the processing steps and related concurrency was spread throughout the codebase. Reasoning about the entire processing pipeline required much more mental effort.

Threading and concurrency primitives are not the easiest to work with, but we are very fortunate to now have an amazing selection of Reactive libraries that provide great abstractions over those primitives. Make use of them!