Try / Finally with AWS Step Functions

AWS Step Functions has some built-in features for catching and handling errors but, surprisingly, it doesn’t have semantics for the usually accompanying “finally” concept.

In my scenario I am creating an ephemeral Kinesis stream in my State Machine, which I then stream a large number of records into while executing one lambda. I then process those records more slowly in a series of subsequent lambda functions. Once completed I then delete the ephemeral kinesis stream.

The problem with this approach is that if there is an unexpected error anywhere in one of my steps it can cause the whole Step Function to fail and end up orphaning the kinesis stream. Therefore I needed a way to reduce the likelihood of this problem with a try/finally pattern.

To accomplish this, first imagine we have this step function:

StartAt: ConfigureIterator
States:
  ConfigureIterator:
    Type: Pass
    Result:
      limit: 500
    ResultPath: $.iterator
    Next: InitializeIterator
  InitializeIterator:
    Type: Task
    Resource: iterator
    InputPath: $.iterator
    ResultPath: $.iterator
    Next: ConfigureXmlStream
  ConfigureXmlStream:
    Type: Pass
    Result:
      gz: true
      root: item
    ResultPath: $.options
    Next: XmlStream
  XmlStream:
    Type: Task
    Resource: xmlstream
    ResultPath: $.xml
    Next: SendItemsToApi
  SendItemsToApi:
    Type: Task
    Resource: items2api
    ResultPath: $.iterator
    Next: IterateNext
  IterateNext:
    Type: Choice
    Choices:
      - Variable: $.iterator.state
        StringEquals: done
        Next: Cleanup
    Default: SendItemsToApi
  Cleanup:
    Type: Pass
    Result: done
    ResultPath: $.iterator.state
    Next: IteratorDone
  IteratorDone:
    Type: Task
    Resource: iterator
    InputPath: $.iterator
    ResultPath: $.iterator
    Next: Finally
  Done:
    Type: Pass
    End: true

In the InitializeIterator step we are creating our ephemeral Kinesis stream. In the XmlStream step we are streaming items from a large xml document into JSON objects which are then written to the stream. Next, in the SendItemsToApi we are reading items out of the kinesis stream, doing some formatting and validation on those items, and then sending each item to a REST endpoint for storage and/or other actions. Finally in the IteratorDone step we are destroying the Kinesis stream.

You could imagine a variety of other possible scenarios where one would need to cleanup resources allocated in a previous Step. In this particular scenario we need to ensure that the IteratorDone step is called regardless of errors that may happen between it and the InitializeIterator step.

To do this we first will wrap then XmlStream and SendItemsToApi steps in a Parallel block with a single branch. The reason we want to do this is so that these steps can be treated like a single block where any errors in any state can be caught and handled in a single Catch clause.

The three steps wrapped in a Parallel block now look like this:

  Main:
    Type: Parallel
    Branches:
      - StartAt: XmlStream
        States:
          XmlStream:
            Type: Task
            Resource: xmlstream
            ResultPath: $.xml
            Next: SendItemsToApi
          SendItemsToApi:
            Type: Task
            Resource: items2api
            ResultPath: $.iterator
            Next: IterateNext
          IterateNext:
            Type: Choice
            Choices:
              - Variable: $.iterator.state
                StringEquals: done
                Next: Cleanup
            Default: SendItemsToApi
    Next: Cleanup
    ResultPath: $.main
    Retry:
      - ErrorEquals: [ 'States.ALL' ]
        MaxAttempts: 3
    Catch:
      - ErrorEquals: [ 'States.ALL' ]
        ResultPath: $.error
        Next: Cleanup

It’s important to note here that the result of the block is an array of results where each index in the array is the result object from the last step of each branch. So in this case we will have an array with a single object in it [ { iterator: ... } ]. If you don’t specify a ResultPath it will replace the entire context object $, which is undesirable in this case since we need to still access the iterator object in a later step.

It’s also important to note that we are storing the caught exception into the $.error field, which we will rethrow later, after cleanup.

  Cleanup:
    Type: Pass
    Result: done
    ResultPath: $.iterator.state
    Next: IteratorDone

  IteratorDone:
    Type: Task
    Resource: iterator
    InputPath: $.iterator
    ResultPath: $.iterator
    Next: Finally

  Finally:
    Type: Task
    Resource: throwOnError
    Next: Done

  Done:
    Type: Pass
    End: true

So now if an error occurs while processing our xml file or sending items to the api it will retry a couple of times and then ultimately capture the error and move to the Cleanup ​phase. We’ve added a new Finally Step, which will throw an exception if there is a value stored in $.error, which will allow the Step Function to complete in an Error state rather than a Success state so we can further trigger alarms through Cloud Watch.

Here is the code for the throwOnError lambda:

import { log, parse, handler } from 'mya-input-shared'

function RehydratedError (message, name, stack) {
  const tmp = Error.apply(this, arguments)
  this.name = tmp.name = name
  this.message = tmp.message = message
  Object.defineProperty(this, 'stack', {
    get: () => [`${this.name}: ${this.message}`].concat(stack).join('\n    at ')
  })
  return this
}

RehydratedError.prototype = Object.create(Error.prototype, {
  constructor: {
    value: RehydratedError,
    writable: true,
    configurable: true
  }
})

export const throwOnError = handler((event, context, callback) => {
  const { feed, error } = event
  if (error) {
    const Cause = error.Cause || '{}'
    parse(Cause, (err, cause) => {
      if (err) return callback(err)
      const { errorMessage, errorType, stackTrace } = cause
      err = new RehydratedError(
        errorMessage || 'An unknown error occurred.',
        errorType || 'UnknownError',
        stackTrace || '')
      log.error('feed_error', err, { feed }, callback)
    })
  } else {
    callback(null, event)
  }
})

Iterating with AWS Step Functions

One interesting challenge I immediately encountered when attempting to work with AWS Lambda and Step functions was the need to process large files. Lambda functions have a couple of limitations namely memory and a 5 minute timeout. If you have some operation you need to perform on a very large dataset it may not be possible to complete this operation in a single execution of a lambda function. There are several ways to solve this problem, in this article I would like to demonstrate how to create an iterator pattern in an AWS Step Function as a way to loop over a large set of data and process it in smaller parts.

Screenshot 2017-03-08 00.39.02

In order to iterate we have created an Iterator Task which is a custom Lambda function. It accepts three values as inputs in order to operate: index, size and count.

Here is the code for this example step function:

{
    "Comment": "Iterator Example",
    "StartAt": "ConfigureCount",
    "States": {
        "ConfigureCount": {
            "Type": "Pass",
            "Result": 10,
            "ResultPath": "$.count",
            "Next": "ConfigureIterator"
        },
        "ConfigureIterator": {
            "Type": "Pass",
            "Result": {
                "index": -1,
                "step": 1
            },
            "ResultPath": "$.iterator",
            "Next": "Iterator"
        },
        "Iterator": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:{region}:{accountId}:function:iterator",
            "ResultPath": "$.iterator",
            "Next": "IterateRecords"
        },
        "IterateRecords": {
            "Type": "Choice",
            "Choices": [
                {
                    "Variable": "$.iterator.continue",
                    "BooleanEquals": true,
                    "Next": "ExampleWork"
                }
            ],
            "Default": "Done"
        },
        "ExampleWork": {
            "Type": "Pass",
            "Result": {
              "success": true
            },
            "ResultPath": "$.result",
            "Next": "Iterator"
        },
        "Done": {
            "Type": "Pass",
            "End": true
        }
    }
}

ConfigureCount

In this step we need to configure the number of times we want to iterate. In this case I have set the number of iterations to 10 and put it into a variable called $.count. In a more complete example this may be the number of files you want to iterate over. For example in my real world scenario I am receiving a substantial CSV file which is then broken into many smaller CSV files, all stored in s3, the number of smaller files is then set into the count variable here. The large CSV file can be read entirely in a single lambda execution, streaming sections into smaller files, never loading the entire file into memory at the same time; but it cannot be processed entirely in a single function. Thus we split it and then iterate over the smaller parts.

ConfigureIterator

Here we set the index and step variables into the $.iterator field, which the iterator lambda uses to determine whether or not it should continue iterating.

Iterator

This is the iterator itself, a small lambda function that simply increments the current index by the step size and calculates the continue field based on the current index and count.

export function iterator (event, context, callback) {
  let index = event.iterator.index
  let step = event.iterator.step
  let count = event.count

  index += step

  callback(null, {
    index,
    step,
    count,
    continue: index < count
  })
}

The reason why we want to support a step size is because we may have multiple workers which operate on data in parallel. In this example we have a single worker but in other cases we may need more in order to complete the overall work in a timely fashion.

IterateRecords

From there we need to immediately move into a Choice state. This state simply looks at the $.iterator.continue field and if it is not true then our iteration is over and we exit the loop. If iteration is not over then we move to the worker tasks which may use the $.iterator.index field to determine which unit of work it should operate on.

ExampleWork

In this example this is just a Pass state, but in a real example this may represent a series of Tasks or Activities which process the data for this iteration. When completed, the last step in the series should point back to the Iterator state.

Its also important to note that all states in this chain must use the ResultPath field to bucket their results in order to preserve the state of the iterator field throughout theses states. Do not override the $.iterator or $.count fields while doing work or you may end up in an infinite loop or error condition.

Done

This state simply signifies the end of the step function.

New Job at Evolve

I’m happy to officially announce that I have accepted a job at a local start-up here in Minneapolis called Evolve.

logo[1]

We’re going to be a very small crew, working closely together to bring Evolve to the next level. I’m extremely excited to take this next step closer to my original passion: video games. I am also very excited to learn more about start-ups and what it takes to put them together and make them successful.

And if you want to play some games head over to my Evolve profile and add me as a friend!

paktc, an npm package

Just to learn about the process more I created a small and semi-useful (ok not-that-useful 🙂 npm package called paktc. Which stands for “Press any key to continue…”.

Which does exactly what the name implies; it prints out that message then waits for you to press something before continuing. This is a standard thing to be added to default console applications for C# and its useful for cases where you’re using an IDE (e.g. Visual Studio with nodejs tools extension) that pops up a command prompt and then quickly closes it again before you can see the results.

paktc (Press any key to continue…)

Prints ‘Press any key to continue..’ when in debug mode and waits for input before closing.

example

// your console application code here...

require('paktc') // Press any key to continue...

install

With [npm] (http://npmjs.org) do:

npm install paktc

 

It was a very interesting experience to work with npm, it’s amazingly easy and simple to use. I can’t imagine any significant platform in the future not having a comparable package manager and still being successful. You should check it out if you haven’t already.

 

Ludum Dare April 2014

Here are some screen shots from my first attempt at a Ludum Dare challenge 🙂 It’s woefully incomplete and I don’t have any modeling skills so I did it all with primitive shapes in Unity3D. But its fun!

ants-ludumdare2014 ants-ludumdare2014_2

In case you’re wondering what Ludum Dare is, the short version is that it is a 48 hour game programming contest that you have to do completely alone just for fun. There is a theme, this time the theme is “Beneath the Surface”.

Download | Source

Embedding Lua into a Windows Store app

I recently decided to do something a little fun and new. I wanted to make a DirextX based Windows Store app that was primarily driven by Lua. Unsurprisingly, I couldn’t find a really solid demo of how to do this so there were a number of hurdles to overcome.

DirectX

The first hurdle, of course, was to figure out how to actually render something in DirextX using the new winrt api’s! I didn’t need anything too complicated, so I decided to go with the simple and easy to follow tutorial over at www.DirectXTutorial.com. The code I am using in this sample is essentially the first part of that tutorial with a Lua engine sitting on top of it. I highly recommend going through that tutorial if you’re not already an expert and maybe pay for the premium content if you like it enough 🙂

Script API

After following through the tutorial I had a working starting point. My next step was to write some code in Lua, that would represent the API I wish I had. After several changes this is what I ended up with:

luadx:background({ 0.0, 0.2, 0.4, 1.0 })
luadx:scene("main", function (scene)
    scene:object("triangle", function (triangle)
        triangle:add("geometry", Geometry.Vertex({
            { 0.0, 0.5, 0.0 },
            { 0.45, -0.5, 0.0 },
            { -0.45, -0.5, 0.0 }
        }))
    end)
end)

The idea was that I wanted to create an API based on Scenes, GameObjects and Components rather than dealing with DirectX functions directly. In this example, the luadx object is actually a module imported into the global scope. It exposes a couple members: background and scene. The background function allows us to set the background color we want to use when redrawing the scene on each step of the render loop. The scene function creates a Scene object and passes it to the included function.

Once we have a Scene we can create GameObjects by calling the object function on the scene. This creates an GameObject and adds it to the Scene and passes it into the provided initialization function, where we can add components to it. In this small sample I only have one component type which is a Vertex component. The Vertex component actually creates a vertex buffer and renders it during the render loop.

This is similar to the structure of how games are laid out in tools like Unity, though this sample is extremely simplistic. Unity is very interesting and worth looking at if you haven’t already. One thing to note is how this pattern relies on Composition rather than inheritance. A scene is composed of game objects, a game objects is composed of scenes and other game objects… In a real game engine you need a few more features to really facilitate composition such as the ability to send messages between components. This sample isn’t that robust but you could use your imagination.

Lua 5.2

The second big hurdle is how to actually get Lua, and add it to your project. I ran into several problems actually, the worst of which was that the compiled versions of Lua available for download are not compatible with winrt which led to me having to download the source, modify it slightly and figure out how to compile it again in Visual Studio 2013. I will include the full source for my working sample with this post but I will also try to explain what I had to do to get it to work.

The first thing I did was slightly modify luaconf.h.

#if WINAPI_FAMILY_PARTITION(WINAPI_FAMILY_DESKTOP_APP)
#define LUA_WINRT
#undef LUA_DL_DLL
#endif

We can use the family partition macro to determine if we are in winrt, and then create a new LUA_WINRT define and also undefine LUA_DL_DLL. Next I had to go into loadlib.c and modify the setpath function. The goal of this modification is to remove calls to getenv, which are not available in winrt apps. Once we have this the only trick is figuring out how to build lua in Visual Studio 2013. For this you need to make a C, lib based project and add the following preprocessor definitions: LUA_OPNAMES;_CRT_SECURE_NO_WARNINGS. After that link your store app to the lua52.lib file you built and you should be good to go!

Lua interop

Finally the fun stuff! This is actually the bulk of the work because I needed to learn quite a few new things. I needed to learn both the Lua language and runtime itself but also the C api for interoping with it. Fortunately, while the Lua API is quite complex, it is also extremely clean and elegant. In my app the Lua interop starts in the Host class.

In most Lua samples you will see a helper macro called lua_open() being used to create the new lua_State but this appears to have been deprecated and so now you use luaL_newstate(). Now that you have your state you would typically call lua_loadlibs(L), but sadly not all of the libraries can be loaded into winrt without further modifications. So instead you include them one by one:

luaopen_base(_state);
luaopen_table(_state);
luaopen_string(_state);
luaopen_math(_state);
luaopen_bit32(_state);
luaopen_coroutine(_state);
luaopen_debug(_state);
luaopen_package(_state);

These are all of the default packages I was able to load without trouble. Next I push the Host object into the lua registry:

lua_pushlightuserdata(_state, this);
lua_setfield(_state, LUA_REGISTRYINDEX, LUA_HOST_REGISTRY_KEY);

This allows me to access the Host object from any native function called by Lua. This is handy because as you’ll soon find out Lua is really only compatible with static functions in C++, so you need to add references to your application into the state of Lua itself. The Lua registry is interesting because it’s essentially a global object that isn’t accessible from script.

Next we import the custom luadx and Geometry modules using the luaL_requiref function. This will put the objects we create in the Require functions into the global scope with the given variable names but it will also make it available for use via the require keyword. If this sample was any more complex I would probably not put these into the global scope but would instead have the user require all needed modules.

Lua has a very interesting API. It’s centered around the concept of a stack and most of the functions use the stack to work. You will push values on and off of the stack and many of the functions make assumptions about the objects that are expected to be at the top of the stack. You have to get quite comfortable with these ideas to work with Lua effectively, and I have lots of examples in this project but rather than go through them all I want to show a single example of a function that I consider to be representative of how you work with Lua in general.

int Scene::New(lua_State *L)
{
    static const int Expected_Args[] = {
        LUA_TFUNCTION,
        LUA_TSTRING,
        LUA_TTABLE,
        LUA_TNONE
    };

    if (!Util::ValidateArgs(L, Expected_Args))
        return 0;

    auto name = lua_tostring(L, -2);
    lua_remove(L, -2);

    lua_getfield(L, LUA_REGISTRYINDEX, LUA_HOST_REGISTRY_KEY);
    auto host = (Host*) lua_touserdata(L, -1);
    lua_pop(L, 1);

    auto scene = (Scene*) lua_newuserdata(L, sizeof(Scene));
    auto scene_ref = luaL_ref(L, LUA_REGISTRYINDEX);
    scene->Initialize(name, scene_ref);
    host->Add(scene);

    // Set scene metatable...
    lua_rawgeti(L, LUA_REGISTRYINDEX, scene_ref);
    if (luaL_newmetatable(L, LUA_SCENE_TYPENAME) == 1)
    {
        static const luaL_Reg Scene_Members[] = {
            { "object", &GameObject::New },
            { NULL, NULL }
        };

        // Initialize metatable...
        lua_pushstring(L, "__index");
        luaL_newlib(L, Scene_Members);
        lua_settable(L, -3);

        lua_pushstring(L, "__gc");
        lua_pushcfunction(L, &Scene::Gc);
        lua_settable(L, -3);
    }

    lua_setmetatable(L, -2);
    lua_call(L, 1, 0);
    return 0;
}

This is the function that is called when luadx:scene is called from script. The first thing we do in this function is attempt to validate the provided arguments. I created a little helper function called Util::ValidateArgs, which looks at the top items in the stack and verifies that they match the types I am expecting. If they do not match then lau_error is called with a string describing what went wrong.

static const int Expected_Args[] = {
    LUA_TFUNCTION,
    LUA_TSTRING,
    LUA_TTABLE,
    LUA_TNONE
};

if (!Util::ValidateArgs(L, Expected_Args))
    return 0;

Next we retrieve the name from the first argument:

auto name = lua_tostring(L, -2);
lua_remove(L, -2);

Note that we are using the index value -2, which means that we are accessing and removing the second element from the top of the stack. If you were to use +2 then you would be accessing the second element from the bottom of the stack. And since this is our first argument, it is the second from the top, the argument at -1 is the function, or the second argument. It can be a little confusing at first to deal with negative indices but you get used to it pretty fast. I created a very useful Util::Dump(L) function that prints out the types of objects in the stack so I can do a sanity check whenever I want.

Next we need to get a reference to the Host object, which we can do by pulling it out of the registry. The lua_getfield function can be used to point at the lua registry and pull the host out of a uniquely named slot. It is then put onto the stack which we can convert to a pointer by calling lua_touserdata and then popping it off the stack.

lua_getfield(L, LUA_REGISTRYINDEX, LUA_HOST_REGISTRY_KEY);
auto host = (Host*) lua_touserdata(L, -1);
lua_pop(L, 1);

Next we want to create a new Scene and add it to the host.

auto scene = (Scene*) lua_newuserdata(L, sizeof(Scene));
auto scene_ref = luaL_ref(L, LUA_REGISTRYINDEX);
scene->Initialize(name, scene_ref);
host->Add(scene);

Here I am also getting a reference to the Lua object that points to the Scene. I can use this later to push the object onto the stack and call members on it or perform other operations on it. This is how you would call back into a Lua script function from native code for example.

Next we attempt to set a metatable onto the object.

lua_rawgeti(L, LUA_REGISTRYINDEX, scene_ref);
if (luaL_newmetatable(L, LUA_SCENE_TYPENAME) == 1)
{

When you call luaL_newmetatable it will return 1 if it actually creates the metatable and 0 if the table already exists. In both cases it will push it onto the stack. We really want a metatable here because it allows us to tap into some special funtions for the object, namely the __index and the __gc functions. The __gc function will be called if the object is collected and we can then perform any custom operations for releasing memory we may have allocated. The __index member allows us to append callbacks to native functions onto our script object. In the case of the scene we want to add an “object” function. When you attempt to access a field in Lua if that field doesn’t exist it will then call into the __index function on the objects metatable and allow it to handle the call instead. By adding a table to the __index property the Lua runtime will automatically create a function that will just redirect calls to that tables members instead. It’s basically a slightly more robust version of javascripts prototypal inheritance.

static const luaL_Reg Scene_Members[] = {
    { "object", &GameObject::New },
    { NULL, NULL }
};

// Initialize metatable...
lua_pushstring(L, "__index");
luaL_newlib(L, Scene_Members);
lua_settable(L, -3);

lua_pushstring(L, "__gc");
lua_pushcfunction(L, &Scene::Gc);
lua_settable(L, -3);

Finally we call the function passed in as the second argument, leaving the table containing the Scene on the top of the stack and return 0, which indicates that the top 0 values should be popped off the stack and popped back on the stack of the calling function.

lua_setmetatable(L, -2);
lua_call(L, 1, 0);
return 0;

And this is about it, the rest is just variations on this theme. You can pop objects containing pointers into the stack and store them for reference later. You can pull pointers out and call functions on them. You can push values and throw errors and debug into the runtime. Its really quite fun and surprisingly easy to do!

Conclusion

Lua is pretty fun and not that hard to embed into your application. It’s not totally cookie cutter for windows but its solvable. As far as C code goes Lua is incredibly clean and really a quintessential example of how to do native code right.

Download

Full Code: luadx.zip