- Architecting Cloud Native Applications
- Kamal Arora Erik Farr John Gilbert Piyum Zonooz
- 925字
- 2021-06-24 15:21:06
Example – event sourced join
In this example, we leverage Event Sourcing and ACID 2.0 to create views that join and group data from multiple event types emitted by multiple components. These events can arrive out of order and the logic is idempotent. In this example, we want to asynchronously join the user-created, user-loggedIn, and order-submitted event types and produce a view which aggregates the user ID, username, last logged in timestamp, and the count of recent orders.
This approach leverages two tables: a view table and an event store table. The view table is the same as the one in the preceding inverse optimistic lock example, so it is excluded for brevity. The following is a fragment from the serverless.yml file, which defines the AWS DynamoDB table that is used to create a component-specific event store for performing the desired join. The ID attribute is the hash key, which contains the concatenation of the field values from the events of interest that effectively form the where or group by clause. In this example, we are joining the events based on user ID. The sequence attribute is the range key, which is unique within the hash key. This will typically be the sequence number created by the stream or the timestamp of the individual events. The database stream is initialized so that we can use the trigger to calculate the view as events are captured.
Events:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${opt:stage}-${self:service}-event-store
AttributeDefinitions:
- AttributeName: id
AttributeType: S
- AttributeName: sequence
AttributeType: S
KeySchema:
- AttributeName: id
KeyType: HASH
- AttributeName: sequence
KeyType: RANGE
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
Next, we have the code for the stream processor, which consumes the desired events from the inter-component stream and stores them in the event store table. In this example, we are interested in the user-created, user-loggedIn, and order-submitted event types. When inserting each event in the event store table, the user ID of each event type and the stream sequence number form the key. This key solves two problems. First, it makes the save operation idempotent. Events are immutable, thus if we save the same event multiple times, it only overwrites the row with the exact same values. This is an important aspect of Event Sourcing. Second, it groups related events together based on the hash key. In this example, all events for a specific user are grouped together, effectively joining the events of interest together.
export const consumer = (event, context, cb) => {
_(event.Records)
.map(recordToUow)
.filter(byType)
.flatMap(saveEvent)
.collect().toCallback(cb);
};
const byType = uow =>
uow.event.type === 'user-created' ||
uow.event.type === 'user-loggedIn' ||
uow.event.type === 'order-submitted';
const saveEvent = uow => {
const params = {
TableName: process.env.EVENT_STORE_TABLE_NAME,
Item: {
id: uow.event.user ? uow.event.user.id : uow.event.order.userId,
sequence: uow.record.kinesis.sequenceNumber,
event: uow.event,
}
};
const db = new aws.DynamoDB.DocumentClient();
return _(db.put(params).promise()
.then(() => uow));
}
Finally, we have the code for the trigger stream processor. This code is invoked each time an event is inserted into the event store table. The processor retrieves all the events from the table based on the given hash key that was just inserted. This returns all the events that are related to this newest event. In this example, it will return all the events for the specific user ID. Now we can calculate the fields for the desired view. We want the username, the last login time, and the count of recent orders. We reduce the list of events into a dictionary keyed by event type. We can rely on the fact that the events are returned in the order of the range key to simplify the logic. We only need to retain the last known last user-loggedIn event and there will be at most one user-created event. We accumulate all the order-submitted events to arrive at the count. Default values are provided to account for any missing events.
This approach solves the problem of receiving events out of order. Every time an event is inserted, we recalculate the view based on the new set of events. When the related events are retrieved, they are returned in the proper order. If an older event is received after new events, that event may or may not change the result of the calculation. In the case of the user-loggedIn event, the older event will be ignored. When a user-created event is received out of order, the view will miss the name until it arrives. For older order-submitted events, they will just accumulate in the total count. Thus as each additional event arrives, the view becomes more up to date, as shown in the following code.
export const trigger = (event, context, cb) => {
_(event.Records)
.flatMap(getRelatedEvents)
.map(view)
.flatMap(saveView)
.collect().toCallback(cb);
};
const getRelatedEvents = (record) => {
const params = {
TableName: process.env.EVENT_STORE_TABLE_NAME,
KeyConditionExpression: '#id = :id',
ExpressionAttributeNames: {
'#id': 'id'
},
ExpressionAttributeValues: {
':id': record.dynamodb.Keys.id.S
}
};
const db = new aws.DynamoDB.DocumentClient();
return _(db.query(params).promise()
.then(data => ({
record: record,
data: data,
}))
);
}
const view = (uow) => {
// create a dictionary by event type
uow.dictionary = uow.data.Items.reduce((dictionary, item) => {
// events are sorted by range key
item.event.type === 'order-submitted' ?
dictionary[item.event.type].push(item.event) :
dictionary[item.event.type] = item.event;
return dictionary;
}, { // default values
'user-created': { user: { name: undefined } },
'user-loggedIn': { timestamp: undefined },
'order-submitted': []
});
// map the fields
uow.item = {
id: uow.record.dynamodb.Keys.id.S,
name: uow.dictionary['user-created'].user.name,
lastLogin: uow.dictionary['user-loggedIn'].timestamp,
recentOrderCount: uow.dictionary['order-submitted'].length,
};
return uow;
}
const saveView = (uow) => {
const params = {
TableName: process.env.VIEW_TABLE_NAME,
Item: uow.item,
};
const db = new aws.DynamoDB.DocumentClient();
return _(db.put(params).promise());
};