AWS Step Functions has some built-in features for catching and handling errors but, surprisingly, it doesn’t have semantics for the usually accompanying “finally” concept.
In my scenario I am creating an ephemeral Kinesis stream in my State Machine, which I then stream a large number of records into while executing one lambda. I then process those records more slowly in a series of subsequent lambda functions. Once completed I then delete the ephemeral kinesis stream.
The problem with this approach is that if there is an unexpected error anywhere in one of my steps it can cause the whole Step Function to fail and end up orphaning the kinesis stream. Therefore I needed a way to reduce the likelihood of this problem with a try/finally pattern.
To accomplish this, first imagine we have this step function:
[code]
StartAt: ConfigureIterator
States:
ConfigureIterator:
Type: Pass
Result:
limit: 500
ResultPath: $.iterator
Next: InitializeIterator
InitializeIterator:
Type: Task
Resource: iterator
InputPath: $.iterator
ResultPath: $.iterator
Next: ConfigureXmlStream
ConfigureXmlStream:
Type: Pass
Result:
gz: true
root: item
ResultPath: $.options
Next: XmlStream
XmlStream:
Type: Task
Resource: xmlstream
ResultPath: $.xml
Next: SendItemsToApi
SendItemsToApi:
Type: Task
Resource: items2api
ResultPath: $.iterator
Next: IterateNext
IterateNext:
Type: Choice
Choices:
– Variable: $.iterator.state
StringEquals: done
Next: Cleanup
Default: SendItemsToApi
Cleanup:
Type: Pass
Result: done
ResultPath: $.iterator.state
Next: IteratorDone
IteratorDone:
Type: Task
Resource: iterator
InputPath: $.iterator
ResultPath: $.iterator
Next: Finally
Done:
Type: Pass
End: true
[/code]
In the InitializeIterator
step we are creating our ephemeral Kinesis stream. In the XmlStream
step we are streaming items from a large xml document into JSON objects which are then written to the stream. Next, in the SendItemsToApi
we are reading items out of the kinesis stream, doing some formatting and validation on those items, and then sending each item to a REST endpoint for storage and/or other actions. Finally in the IteratorDone
step we are destroying the Kinesis stream.
You could imagine a variety of other possible scenarios where one would need to cleanup resources allocated in a previous Step. In this particular scenario we need to ensure that the IteratorDone
step is called regardless of errors that may happen between it and the InitializeIterator
step.
To do this we first will wrap then XmlStream
and SendItemsToApi
steps in a Parallel block with a single branch. The reason we want to do this is so that these steps can be treated like a single block where any errors in any state can be caught and handled in a single Catch clause.
The three steps wrapped in a Parallel block now look like this:
[code]
Main:
Type: Parallel
Branches:
– StartAt: XmlStream
States:
XmlStream:
Type: Task
Resource: xmlstream
ResultPath: $.xml
Next: SendItemsToApi
SendItemsToApi:
Type: Task
Resource: items2api
ResultPath: $.iterator
Next: IterateNext
IterateNext:
Type: Choice
Choices:
– Variable: $.iterator.state
StringEquals: done
Next: Cleanup
Default: SendItemsToApi
Next: Cleanup
ResultPath: $.main
Retry:
– ErrorEquals: [ ‘States.ALL’ ]
MaxAttempts: 3
Catch:
– ErrorEquals: [ ‘States.ALL’ ]
ResultPath: $.error
Next: Cleanup
[/code]
It’s important to note here that the result of the block is an array of results where each index in the array is the result object from the last step of each branch. So in this case we will have an array with a single object in it [ { iterator: ... } ]
. If you don’t specify a ResultPath it will replace the entire context object $
, which is undesirable in this case since we need to still access the iterator object in a later step.
It’s also important to note that we are storing the caught exception into the $.error
field, which we will rethrow later, after cleanup.
[code]
Cleanup:
Type: Pass
Result: done
ResultPath: $.iterator.state
Next: IteratorDone
IteratorDone:
Type: Task
Resource: iterator
InputPath: $.iterator
ResultPath: $.iterator
Next: Finally
Finally:
Type: Task
Resource: throwOnError
Next: Done
Done:
Type: Pass
End: true
[/code]
So now if an error occurs while processing our xml file or sending items to the api it will retry a couple of times and then ultimately capture the error and move to the Cleanup
phase. We’ve added a new Finally
Step, which will throw an exception if there is a value stored in $.error
, which will allow the Step Function to complete in an Error state rather than a Success state so we can further trigger alarms through Cloud Watch.
Here is the code for the throwOnError
lambda:
[code language=”javascript”]
import { log, parse, handler } from ‘mya-input-shared’
function RehydratedError (message, name, stack) {
const tmp = Error.apply(this, arguments)
this.name = tmp.name = name
this.message = tmp.message = message
Object.defineProperty(this, ‘stack’, {
get: () => [`${this.name}: ${this.message}`].concat(stack).join(‘\n at ‘)
})
return this
}
RehydratedError.prototype = Object.create(Error.prototype, {
constructor: {
value: RehydratedError,
writable: true,
configurable: true
}
})
export const throwOnError = handler((event, context, callback) => {
const { feed, error } = event
if (error) {
const Cause = error.Cause || ‘{}’
parse(Cause, (err, cause) => {
if (err) return callback(err)
const { errorMessage, errorType, stackTrace } = cause
err = new RehydratedError(
errorMessage || ‘An unknown error occurred.’,
errorType || ‘UnknownError’,
stackTrace || ”)
log.error(‘feed_error’, err, { feed }, callback)
})
} else {
callback(null, event)
}
})
[/code]
You must log in to post a comment.