- Architecting Cloud Native Applications
- Kamal Arora Erik Farr John Gilbert Piyum Zonooz
- 1121字
- 2021-06-24 15:21:03
Example – stream processor flow control
The following example is an AWS Kinesis stream processor running as an AWS Lambda function. The code is implemented using the functional reactive programming paradigm supported by the Highland.JS streaming library. Similar functionality can be implemented with libraries such as RxJS. This main block of code sets up a pipeline of multiple steps, which the micro-batch of events contained in the event.Records array will flow through. This stream processor is responsible for capturing Item Created and Item Updated events and storing the data in DynamoDB. It demonstrates validation errors, unexpected errors, resource errors, and the handling of these errors.
export const consume = (event, context, cb) => {
_(event.Records)
.map(recordToUow)
.filter(forItemCreatedOrUpdated)
.tap(validate)
.tap(randomError)
.rateLimit(3, 30) // 3 per 30ms
.map(save)
.parallel(3)
.errors(errors)
.collect().toCallback(cb);
};
Each record is parsed into a unit of work (uow) that must succeed or fail together. In this example, the uow is a single event. However, many processors will group events together in various ways based on the contents of the events. In these cases, the uow will wrap all the events. Next, we filter for the events of interest. All other events are ignored. Note that this processor is interested in multiple types of events. It is typical that different event types will encounter different types of errors. The primary benefit of this pattern is that one event type will not prevent other events, types from processing when it is experiencing problems. This is in effect a micro bulkhead.
const recordToUow = rec => ({
record: rec,
event: JSON.parse(new Buffer(rec.kinesis.data, 'base64'))
});
const forItemCreatedOrUpdated = uow =>
uow.event.type === 'item-created' || uow.event.type === 'item-updated';
Just as in any other type of program, it is important to assert that the input data meets the expected criteria. However, validation errors are unrecoverable because the event is already in the stream, so the processor will need to let the event pass through. When an event in a stream is invalid, it is an indication that there is likely a bug in an upstream component. The event will need to be set aside and resolved out of band and then resubmitted as needed. In this example, we throw a validation error when the required fields are not provided. When we manually create errors, we adorn the uow to indicate to the error handler that this is a handled error.
const validate = uow => {
if (uow.event.item.name === undefined) {
const err = new Error('Validation Error: name is required');
err.uow = uow; // handled
throw err;
}
}
The following is a simulation of an unexpected transient error. These unhandled errors do not generate fault events and will cause processing to stop and the function to retry, until either the event goes through or it expires. Retry can be a good thing for transient errors, as it gives the source of the error the chance to self-heal and the stream processor will make forward progress once the problem corrects itself.
const randomError = () => {
if (Math.floor((Math.random() * 5) + 1) === 3) {
throw new Error('Random Error'); // unhandled
}
}
The following is a basic example of catching a resource error. First, we initialize the timeout to a reasonably short value and we will rely on the SDK to retry several times with an exponential backoff. If an error still occurs after the retries, then we catch the error, adorn the uow, and rethrow the error. This is a good starting point for any stream processor. All errors will be handled and produce faults, thus allowing the stream to continue processing. As we learn which types of errors can self-heal and which cannot, we can tune the logic, based on the status code and content of the error, to allow the self-healing errors to fall through, cause the processing to stop and the function to retry. This is also the place where we could optionally leverage a traditional circuit breaker. When the circuit is already open, an error would automatically be thrown, without the overhead of calling the resource and waiting on multiple retries. The same logic would apply regarding whether the error causes a fault or a retry. This decision will often depend on whether the resource is a cloud service or an external service. If a cloud service is failing then we will most likely be failing over to another region and thus it might be appropriate to flush all the events through as faults and resubmit them after the region recovers. If an external service is failing, then it may be best to continue to retry.
const save = uow => {
const params = {
TableName: process.env.TABLE_NAME,
Item: uow.event.item,
};
const db = new aws.DynamoDB.DocumentClient({
httpOptions: { timeout: 1000 }
});
return _(db.put(params).promise()
.catch(err => {
err.uow = uow; // handled
throw err;
}));
}
This is the actual error handler. When an error is thrown from a step in the stream pipeline, it will pass over all the remaining steps until it hits an error handler or the end of the pipeline. If the error hits the end of the pipeline, then the processing will stop and the function will retry. Here the error handler inspects the error to determine if it is adorned with a uow. If there is no uow, then the error is considered unhandled and it is rethrown to stop the processing. Otherwise, an error with a uow is considered handled and it will publish a fault event and allow processing to continue. The fault event contains the error information, the unit of work, and the name of the processor that raised the fault. This information serves two purposes. First, it provides the necessary context for investigating the root cause of the fault. Second, all the events contained within the uow can be resubmitted to the specified processor by the resubmission tool if deemed necessary.
const errors = (err, push) => {
if (err.uow) { // handled errors
push(null, publish({
type: 'fault',
timestamp: Date.now(),
tags: {
functionName: process.env.AWS_LAMBDA_FUNCTION_NAME,
},
err: {
name: err.name,
message: err.message,
stack: err.stack,
},
uow: err.uow,
}));
} else { // unhandled errors
push(err); // rethrow
}
}
Here we are publishing the fault event to the stream. The data lake will consume the fault event, as a fault is an event like any other. The monitoring system should also consume and alert on fault events. When deemed necessary, the events in the uow can be resubmitted from the data lake to the source function.
const publish = event => {
const params = {
StreamName: process.env.STREAM_NAME,
PartitionKey: uuid.v4(),
Data: new Buffer(JSON.stringify(event)),
};
const kinesis = new aws.Kinesis({
httpOptions: { timeout: 1000 }
});
return _(kinesis.putRecord(params).promise());
}