Event Aggregation Workflow Via Durable Functions

Recently at work… we came across a workflow, where we had to aggregate certain type of events by leveraging Durable Functions.

The use case was something like the this:

When a certain primary event is received by a workflow step, that step has to wait for all of its dependencies to arrive before proceeding further with the next one. However, if a certain threshold timeout occurs, then that step can skip waiting for its dependencies.

I think the following sequence diagram would help explain this further:

image

In the above diagram, when Event X (primary event) is received on a topic, the Event Aggregator step starts a threshold timer (24 hours) in this case and then simultaneously waits for all of its dependencies to arrive (on different topics).

If all of the dependencies arrive within that threshold, the Event Aggregator moves on and publishes its Completed status to the next topic.

However if a timeout occurs, then the Event Aggregator abandons the event aggregation and publishes its Incomplete status.

I wanted to test this out using a small PoC and have uploaded that code here

NOTE: The following code has some similarities to one of my previous post. You can read more details there or go to official docs

Now, lets dive into some of the interesting parts of EventAggregatorOrchestrator class:

  • First, defining the events and its dependencies in a Dictionary is shown below:
private static readonly ConcurrentDictionary<string, List<string>> dependencies;

static EventAggregatorOrchestrator()
{
	dependencies = new ConcurrentDictionary<string, List<string>>();
	dependencies.TryAdd("Event A", new List<string>() { "A-1", "A-2" });
	dependencies.TryAdd("Event B", new List<string>() { "B-1" });
	dependencies.TryAdd("Event C", new List<string>() { "" });
}

  • Next, our Durable Client functions which are the entry points for receiving all (primary & dependencies) events via EventGrid.

[FunctionName("Event-Subscriber")]
public async Task ReceiveEventsAsync([EventGridTrigger] EventGridEvent eventGridEvent, [DurableClient] IDurableClient client)
{
	Logger.LogInformation($"Received Event: {eventGridEvent.Data.ToString()}");
	await ReceiveOrUpdateEventsAsync(eventGridEvent, client);
}

[FunctionName("Dependencies-Subscriber")]
public async Task ReceiveDependenciesAsync([EventGridTrigger] EventGridEvent eventGridEvent, [DurableClient] IDurableClient client)
{
	Logger.LogInformation($"Received Dependency: {eventGridEvent.Data.ToString()}");
	await ReceiveOrUpdateEventsAsync(eventGridEvent, client);
}

The above functions end up calling a helper method ReceiveOrUpdateEventsAsync which has the logic for managing the Orchestrator function. This logic either starts a new orchestration, or raises an external event to resume a running orchestrator. The below snippet shows that logic:


private async Task ReceiveOrUpdateEventsAsync(EventGridEvent eventGridEvent, IDurableClient client)
{
	// Get orchestration Status
	var orchestration = await client.GetStatusAsync(eventGridEvent.Subject); // Subject is Unique for Testing
	if (orchestration == null)
	{
		var instance = await client.StartNewAsync(@"Event-Aggregator-Orchestrator", eventGridEvent.Subject, eventGridEvent);
		Logger.LogInformation($"Started new Orchestration instance {instance} for {orchestration}");
	}
	else
	{
		_ = orchestration.RuntimeStatus switch
		{
			OrchestrationRuntimeStatus.Running => RaiseEventForOrchestration(),
			_ => StartOrchestration()
		};
	}

	async Task RaiseEventForOrchestration()
	{
		await client.RaiseEventAsync(orchestration.InstanceId, @"Event-Aggregator-Orchestrator", eventGridEvent);
		Logger.LogInformation($"Received dependency event for {orchestration}");
	}

	async Task StartOrchestration()
	{
		var instance = await client.StartNewAsync(@"Event-Aggregator-Orchestrator", eventGridEvent.Subject, eventGridEvent);
		Logger.LogInformation($"Started new Orchestration instance {instance} for {orchestration}");
	}
}

[FunctionName("Event-Aggregator-Orchestrator")]
public async Task AggregateEventsAsync([OrchestrationTrigger] IDurableOrchestrationContext context)
{
	try
	{
		// Get the list of dependencies to aggregate for a given event
		var receivedEvent = context.GetInput<EventGridEvent>();
		string status = "";

		// Check if any dependencies     
		if (dependencies.TryGetValue(receivedEvent.Subject.ToString(), out List<string> dependenciesList))
		{
			if (dependenciesList.Any())
			{
				using var cts = new CancellationTokenSource();
				
				// Create timer to wait for dependencies to be received
				var endTime = context.CurrentUtcDateTime.Add(TimeSpan.FromSeconds(30)); // Durable Timer                         
				var timeoutTask = context.CreateTimer<List<string>>(endTime, default, cts.Token);

				// Start tracking dependencies
				var dependenciesRemaining = dependenciesList.ToList();
				while (dependenciesRemaining.Any())
				{
					// wait for dependencies to arrive
					var dependenciesTask = context.WaitForExternalEvent<EventGridEvent>(@"Event-Aggregator-Orchestrator");
					var completedTask = await Task.WhenAny(timeoutTask, dependenciesTask);
					if (completedTask == dependenciesTask)
					{
						if (dependenciesTask.Result != null)
						{
							dependenciesRemaining.Remove(dependenciesTask.Result.EventType);
							if (dependenciesRemaining.Count == 0)
							{
								// All dependencies received
								status = "All dependencies received!";
								cts.Cancel();
								break;
							}
						}
					}
					else if (completedTask == timeoutTask)
					{
						// Timeout
						status = $"Timeout Occured, dependencies not received: {dependenciesList.Count}";
						break;
					}
				}
			}
			else
			{
				status = "No dependencies, so moving on!";
			}
		}
		//
		await context.CallActivityAsync(@"Event-Status-Publisher", status);
	}
	catch (TaskCanceledException ex)
	{
		if (!context.IsReplaying)
			Logger.LogError(ex.ToString());
	}
	catch (Exception ex)
	{
		if (!context.IsReplaying)
			Logger.LogError(ex.ToString());
	}
}

Some of the key points to note in the above snippet are creation of Durable Timer via context.CreateTimer.

Next is waiting for the dependencies collection via external event using context.WaitForExternalEvent.

Also, since the orchestration history is replayed I had to be careful of not introducing any non-deterministic logic. Read more about that here.

Finally, once the Event Aggregation finishes, it ends up publishing its status via an Activity Function which is shown below:


[FunctionName("Event-Status-Publisher")]
public void PublishStatus([ActivityTrigger] string status)
{
	Logger.LogInformation(status);
}

Running the workflow for both use cases (timeout and all dependencies received) is shown below:

image

image

image

image

So, there you see folks! Implementing this durable workflow was very convenient, It takes a little used to adhering to Orchestration code constraints but I guess with more practice, you get used to it.

In the next post, I’ll try to take this further and see if I can implement this using Durable Entities to compare and contrast the implementation.

Related Posts