- Architecting Cloud Native Applications
- Kamal Arora Erik Farr John Gilbert Piyum Zonooz
- 861字
- 2021-06-24 15:21:07
Example – offline-first counter
This example demonstrates the essentials for storing and synchronizing data on the client side. I have kept the example simple to focus on the plumbing. The example is implemented as a React application that leverages an AWS Cognito Dataset to store a counter and the square of the counter. The user increments the counter and it is stored locally. When the user presses the synchronize button the counter-updated event is published to the event stream. The consumer of this event calculates a materialized view that is the square of the counter and stores the value on the cloud side for synchronization back to the client. This example can easily be mapped to an e-commerce application, which stores the cart (that is, the counter) and order status (that is, the square) on the client or a customer survey application that stores answers locally until the survey is completed.
First, we have the React application that displays the counter and its square and renders the Increment and Synchronize buttons. Synchronize would typically happen in the background on load, periodically and/or on specific user actions.
class App extends React.Component {
constructor(props) {
super(props)
this.state = { count: 0, squared: 0 };
}
componentWillMount() { ... }
increment() { ... }
synchronize() { ... }
render() {
return (
<div>
<h1>Counter: {this.state.count}</h1>
<h1>Squared: {this.state.squared}</h1>
<button onClick={this.increment.bind(this)}>Increment</button>
<button onClick={this.synchronize.bind(this)}>Synchronize</button>
</div>
);
}
}
ReactDOM.render(
<App />,
document.getElementById('root')
);
Next, we have the componentWillMount method, which is part of the React standard component life cycle. This method initializes the Cognito client, retrieves the data from local storage, and sets the values in the component's state, which triggers re-rendering. In this example, the user is unauthenticated. If the user previously accessed the identity pool, then credentials will be present in local storage; otherwise, a new user is created and the dataset is initialized with zeros.
componentWillMount() {
AWS.config.region = 'us-east-1';
AWS.config.credentials = new AWS.CognitoIdentityCredentials({
IdentityPoolId: 'us-east-1:xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx',
});
AWS.config.credentials.get(() => {
const syncClient = new AWS.CognitoSyncManager();
syncClient.openOrCreateDataset('counter', (err, dataset) => {
dataset.get('count', (err, value) => {
this.setState({
count: value ? Number(value) : 0
});
});
dataset.get('squared', (err, value) => {
this.setState({
squared: value ? Number(value) : 0
});
});
this.dataset = dataset;
});
});
}
The increment() method just adds one to the current counter value, saves it in local storage, and updates the components state to trigger re-rendering. This is analogous to an add to cart method that stores the cart as a JSON string.
increment() {
const val = this.state.count + 1;
this.dataset.put('count', String(val), (err, record) => {
this.setState({
count: val
});
});
}
The synchronize() method sends state changes from local storage to the cloud and retrieves state changes from the cloud. The component's state is once again updated to re-render the screen. In an e-commerce example, this could be analogous to the final submission of an order in a checkout flow. The cart has been transformed to an order and stored locally with a submitted status.
synchronize() {
this.dataset.synchronize({
onSuccess: (data, newRecords) => {
this.dataset.get('squared', (err, value) => {
this.setState({
squared: value
});
});
}
});
}
This is an example of a stream processor that is performing Database-First Event Sourcing logic on the changes streaming from AWS Cognito as the result of a synchronization. A counter-updated event is published when a count sync is found. Continuing our e-commerce analogy, this could be reacting to a sync for an order with a submitted status and publish an order-submitted event. In a customer survey system, this would be looking for a survey completed status.
export const trigger = (event, context, cb) => {
_(event.Records)
.flatMap(recordToUow)
.filter(forCountSync)
.map(toEvent)
.flatMap(publish)
.collect().toCallback(cb);
};
const forCountSync = uow => uow.sync.key === 'count';
const toEvent = uow => {
uow.event = {
id: uuid.v1(),
type: `counter-updated`,
timestamp: uow.sync.lastModifiedDate,
partitionKey: uow.record.kinesis.partitionKey,
tags: {
identityPoolId: uow.data.identityPoolId,
identityId: uow.data.identityId,
datasetName: uow.data.datasetName
},
record: uow.sync
}
return uow;
}
This is an example of a stream processor that is calculating a materialized view for a specific user that is stored in an AWS Cognito Dataset instead of a DynamoDB table, a search engine, or an S3 bucket. This example is calculating the square of the latest counter value. AWS Cognito requires a read (listRecords) operation prior to a write (updateRecords) operation. This allows for retrieving the syncCount values, which act as optimistic locks. These values could be leveraged to perform an Inverse OpLock. We could also leverage the Event sourced join technique as well. The point being that this is just a materialized view that is user-specific and will ultimately be synchronized to that user's devices. Continuing the e-commerce analogy, this would be consuming events from order-fulfillment components that are publishing order status updates. Any data can be streamed to a specific client using this approach.
module.exports.consumer = (event, context, cb) => {
_(event.Records)
.map(recordToUow)
.filter(forCounterUpdated)
.flatMap(saveSquare)
.collect().toCallback(cb);
};
const forCounterUpdated = uow => uow.event.type === 'counter-updated';
const saveSquare = uow => {
const db = new aws.CognitoSync();
uow.params = {
IdentityPoolId: uow.event.tags.identityPoolId,
IdentityId: uow.event.tags.identityId,
DatasetName: uow.event.tags.datasetName
};
return _(db.listRecords(uow.params).promise()
.then(data => {
uow.params.SyncSessionToken = data.SyncSessionToken;
uow.params.RecordPatches = [{
Key: 'squared',
Value: String(Math.pow(Number(uow.event.record.value), 2)),
Op: 'replace',
SyncCount: data.DatasetSyncCount
}];
return db.updateRecords(uow.params).promise()
}));
}