In the previous post of this series, the high-level idea of running asynchronous processes using Activity Task for synchronization has been described. Also several other solutions to this problem have been compared. In this post the details of the implementation are discussed.
You can check out first part of this series here. We will present illustration of the algorithm from the previous part, to make it easier to follow up with the code:
Keep in mind that this illustration probably won’t be sufficient to understand the technique, thus it is recommended to read previous part first.
An example project with two States Machines is available on our Github. It is implemented using Terraform for infrastructure and Python for Lambdas code, however, porting it to other languages should be straightforward.
Start asynchronous action Lambda
Below, the code of Start asynchronous action Lambda is visible:
It is important to note that the event argument is ignored here. This Lambda is invoked only to start the asynchronous action for an Activity with the input from the Activity. Also the output of Lambda is ignored in the Execution – we are putting here nested_execution_arn only for debugging purposes.
The second part is to handle the case when the GetActivityTask has not returned any Task. It is rather unlikely to happen, however, it should also be handled. In such a case we should retry the action.
The third thing is the format of the input for the inner State Machine. We are passing JSON with the only task_token and state fields. The task_token holds the token for the Activity while the state holds the input for the inner Execution. We will show later how the inner State Machine can be structured to make it mostly unaware of being part of the outer State Machine despite an obvious change of the input structure.
Retrying GetActivityTask upon failure
We are using simple Retry configuration to retry getting the Activity Task.
Decoupling State Machine from Parallel output
You may have noticed that we use Parallel State to run Lambda and the Activity in parallel. Although it is very convenient, it also returns an array of results to the next States. It is useful in usual cases, however, in this context it would be preferable to get the result of only the Activity State. The result of Lambda which starts the inner Execution provides merely a debugging value. We want to have only the result of the inner Execution as the Parallel State output along with its input so the effect is the same as if the Task State was used instead of the nested State Machine. The manipulation of inputs and outputs in Step Functions is described in documentation. We achieve this by making use of the OutputPath in the Parallel State and the ResultPath along with the InputPath in the Activity Task.
For reference you may want to look at outer State Machine definition on Github.
So let’s assume that our outer Execution has the following input:
We want to pass async_action_arguments to the inner Execution and put its result in async_result field without modifying the existing fields.
The first requirement can be solved by specifying the following InputPath and the ResultPath for the Activity Task in the State Machine:
The InputPath tells Step Functions to “take async_action_arguments field from JSON and then pass it as an input for this State”. The meaning of the ResultPath is: “take the result of the State and save it as the async_result field in the original input (before applying the InputPath)”.
So the input actually received by the Activity Task (Wait for the asynchronous action) will look like:
And the result of the Activity Task will have the following structure:
There is one problem – the output of the Parallel State is an array and we would like to directly set the output of the Activity Task as the whole Parallel State output. Fortunately, it can be easily solved by setting the OutputPath property of the Parallel State as follows:
It assumes that Activity Task is the second branch within the Parallel State. Thanks to that the output of the Activity Task will be forwarded as the Parallel State’s output.
Handling errors within inner State Machine and decoupling it from invocation details
Here the goal is to build the inner State Machine in a way that it will send the correct result to the outer State Machine (be it success or failure) even upon errors and it shouldn’t force us to largely modify the inner State Machine.
Handling errors can be easily solved by applying the Try-Catch pattern for Step Functions. In the example project we are using exactly the same pattern to handle errors and send success / failure notifications appropriately.
However, the second issue still applies. Let’s take a look at the inner State Machine input for a bit of context:
Inside the inner State Machine we would like to work on the contents of the state field. The task_token field is only the implementation detail. To solve this problem we will once again make use of InputPath and ResultPath properties. Here, we can assume that part of the State Machine outside the Parallel State is “technical” and we can afford to handle a bit of implementation details there (it can be reused across many State Machines). However, inside the Parallel State (inside “Try” block) we shouldn’t be bothered with such details.
By specifying the InputPath and the ResultPath, as follows for the Parallel State, we will pass only the state field to the contents of the Parallel State and save its result in the same field:
This way we solved our second issue – inside the Parallel State, we will get the following input:
You may have noticed that this JSON is exactly the same as the input for the Activity Task. And you are right! It will always be the exact same JSON because the input for the Activity Task is placed in the state field of the inner State Machine input by the Lambda function (Start asynchronous action). The input is nested only because we need to pass the Task Token along the original input.
The final output of the Parallel State (“Try” block) will have the following structure:
The state field is an array. Unfortunately, the OutputPath works on the final result (after processing the ResultPath) so we cannot use it to unwrap the result from an array. Instead, we will simply take the first element from that array inside the Send success State. It is not the clearest solution yet the scope of it is small and simple.
Send success and Send failure
The implementation of Lambdas for both these Tasks is rather simple. Send success is presented below. As previously mentioned, you can see that as a result the first element of the state array is sent.
In the Send failure State we are sending error info to the outer Execution. This way we can handle errors in the outer State Machine (by using the Error name) and it simplifies troubleshooting because we have access to the Cause directly in the outer Execution.
The solution presented here has been tested and it works. It has, however, a few points where is could be improved to increase reliability of the whole solution. Some of them may not be applicable or may not make sense in every use case so keep that in mind when deciding whether to implement them.
Separate GetActivityTask from StartExecution
As mentioned earlier, it may be profitable to split the Start asynchronous action into two States. One would be responsible for getting the Task Token and the second one would start the inner Execution. This way, even if starting the inner Execution fails, we still have access to the Task Token and can easily retry.
Add timeout in outer State Machine
To protect ourselves from a case when the inner Execution takes too long or it has silently failed (for whatever reason) we may want to add timeout for Activity State. It allows us also to handle it in a gentle way by informing the user about that, trying to cancel the asynchronous process if possible or automatically creating the support ticket. The way we handle this depends highly on our domain.
Make use of heartbeats
Step Functions allow us to specify heartbeats for an Activity State. This way we may put requirements on an asynchronous process to send the heartbeat every N seconds or otherwise a timeout error will be raised. This is really a nice way to ensure that the asynchronous process is alive, however, such process needs to be aware that it should send heartbeats and that it is really hard to implement if our nested process is a State Machine.
The solution for running asynchronous processes presented here using Activities without workers simplifies infrastructure by removing the need for creating workers and servers for use cases that do not really require them. It may be also considered an interesting approach because even the official documentation for GetActivityTask endpoint mentions workers, not a generic use case for any asynchronous task.
It adds a great deal of flexibility in designing systems using Step Functions by allowing to start State Machines from within other State Machines and wait for their execution. The started State Machine does not have to be defined upfront so it gives the possibility to dynamically construct complex workflows.
Even though running nested State Machines may not be useful in your case, I hope that this article has shown you a different way of using Activities in Step Functions, which allows to implement specific processes in a gentle way.
Feel free to comment and share your thoughts. Do not forget to check out the example project on our Github!