Example – stream, producer, and consumer

This example demonstrates the basic building blocks for implementing event streaming by leveraging cloud-native services. As depicted in the diagram at the top of the pattern, a producer publishes events to the stream (AWS Kinesis) and stream processors (AWS Lambda) consume and process the events. The following is a fragment of an AWS CloudFormation resource from a Serverless Framework serverless.yml file. In Chapter 6, Deployment, we will discuss how this fits into a continuous integration and deployment pipeline. This demonstrates that provisioning a cloud-native event stream, such as AWS Kinesis, is completely declarative and boilerplate and thus has a very low barrier to entry.

    Resources:
ExampleStream:
Type: AWS::Kinesis::Stream
Properties:
Name: exampleStream
RetentionPeriodHours: 24
ShardCount: 1

The following example producer code publishes an item-submitted event to an AWS Kinesis stream. The domain entity is wrapped in a standard event envelope that at a minimum specifies an id, type, and timestamp. Then the event is wrapped in the parameters needed for the cloud provider's SDK and published.

  const item = {
id: uuid.v4(),
name: 'Cloud Native Development Patterns and Best Practices'
};

const event = {
id: uuid.v1(),
type: 'item-submitted',
timestamp: Date.now(),
item: item
};

const params = {
StreamName: process.env.STREAM_NAME,
PartitionKey: item.id,
Data: new Buffer(JSON.stringify(event)),
};

const kinesis = new aws.Kinesis();
return kinesis.putRecord(params).promise();

There are several important things to note here. First is the use of two types of UUIDs. The domain entity uses a version 4 UUIDs while the event uses a version 1 UUID and the domain entities ID is used as the partition key when sending the event to the stream. Version 1 UUIDs are based on a timestamp, while version 4 UUIDs are based on a random number. Events are unique facts that occurr at a specific point in time. Therefore, a timestamp-based UUID makes the most sense. This also means that event IDs will have a natural sort order, which will be beneficial when storing and indexing events in a search engine, as we will discuss in the Data Lake pattern. However, the same does not hold true when storing events in a stream. Instead, we use the domain entities random number-based UUID as the partition key. This helps increase the likelihood that we have an even distribution of events across the shards of the stream.

Conversely, if we used a timestamp-based UUID for the partition key then all the events in a time period would hash to a single hot shard. Each shard will be paired with an instance of a function. We maximize throughput by evenly distributing the events across the shards, which will invoke multiple function instances in parallel. Furthermore, we use the ID of the domain entity to ensure that all events for a given domain entity are processed by the same shard in the order in which they arrive. This example sends a single event to the stream at a time. We will typically publish events in batches to increase throughput.

The following fragment from a Serverless Framework serverless.yml file demonstrates that provisioning a consumer function is completely declarative and largely boilerplate and thus lowers the barrier to entry for stream processing. The batch size is one of the main dials for tuning stream processing. We ultimately want to tune the batch size in the context of the stream processing logic, the memory allocation, and the function timeout.

functions:
consumer:
handler: handler.listen
events:
- stream:
type: kinesis
arn:
Fn::GetAtt:
- ExampleStream
- Arn
batchSize: 100
startingPosition: TRIM_HORIZON

Next, we have an example of a stream processor itself. This example is kept very basic to demonstrate the basic building blocks, but we can implement some very interesting logic in stream processors. The first thing to note is that we are using the functional reactive programming model because it matches well with the fact that we are processing micro-batches of events. Our batch size is set to 100, so we will receive up to 100 events in a single function invocation. If there are less than or equal to 100 events in the stream, then the function will receive all the events currently in the stream; otherwise, if there are more than 100 events in the stream, then the function will only receive 100 events. The main block of code sets up a pipeline of multiple steps, which the micro-batch of events contained in the event.Records array will flow through individually. The first step parses the event object from the record. Next, we filter out all events except for the item-submitted events. The streams will contain many different types of events and not all consumers will be interested in all events. Thus, these smart endpoints will weed out unwanted events upfront. Finally, we just print the event to the log, but ultimately a stream processor saves its results to a cloud-native database or produces more events. The function callback is invoked once all the events in the micro-batch have flowed through the pipeline. We will discuss error handling and other advanced stream processing features in the Stream Circuit Breaker pattern.

export const consume = (event, context, cb) => {
_(event.Records)
.map(recordToEvent)
.filter(forItemSubmitted)
.tap(print)
.collect().toCallback(cb);
};

const recordToEvent = r =>
JSON.parse(new Buffer(r.kinesis.data, 'base64'));
const forItemSubmitted = e => e.type === 'item-submitted';
const print = e => console.log('event: %j', e);