Building Complex Workflows with Azure Functions and Durable Functions: A Step-by-Step Guide with Example
Azure Functions is a popular serverless computing platform that enables developers to build and deploy event-driven applications. Azure Functions allow developers to create small, single-purpose functions that execute in response to events. However, not all applications can be built with single-purpose functions. Some applications require long-running workflows that involve multiple steps and can span hours, days, or even weeks. This is where Durable Functions come in. Durable Functions is an extension of Azure Functions that enables developers to write long-running workflows in a serverless environment. In this article, we will explore how to use Durable Functions to build complex, multi-step processes.
What are Durable Functions?
Durable Functions is an extension of Azure Functions that enables developers to write stateful, long-running workflows in a serverless environment. Durable Functions allow developers to write complex, multi-step processes as code. The workflows can include branching, conditional logic, and error handling. Durable Functions provide a way to build scalable and reliable serverless applications.
How do Durable Functions work?
Durable Functions work by creating a series of functions that work together to perform a long-running workflow. These functions can be triggered by events such as an HTTP request, a message on a queue, or a timer. The first function in the workflow is called the orchestrator function. The orchestrator function is responsible for starting and controlling the workflow. The orchestrator function can call other functions, which are called activity functions, to perform specific tasks in the workflow. Activity functions can be written in any language that Azure Functions support, including C#, Java, JavaScript, and Python.
Durable Functions use a concept called checkpoints to ensure that the workflow can survive failures. Checkpoints are used to store the state of the workflow. If a function fails or is terminated, the workflow can be restarted from the last checkpoint. Checkpoints are stored in Azure Storage, which provides durability and reliability.
Durable Functions also provide a feature called the Durable Task Framework. The Durable Task Framework is a set of building blocks that developers can use to create complex workflows. The Durable Task Framework includes features such as timers, sub-orchestrations, and human interactions.
How to use Durable Functions?
To use Durable Functions, you need to have an Azure subscription. Once you have an Azure subscription, you can create an Azure Functions app in the Azure portal. In the Functions app, you can create a new function and select the Durable Functions template. The template will create an orchestrator function and a starter function. The starter function can be used to start the workflow by sending a message to the orchestrator function.
Once you have created the orchestrator function, you can add activity functions to perform specific tasks in the workflow. Activity functions can be written in any language that Azure Functions support. Activity functions can be triggered by events such as an HTTP request, a message on a queue, or a timer.
Durable Functions provide a way to build complex, multi-step processes. For example, you can use Durable Functions to build a workflow that includes multiple steps such as data ingestion, data processing, and data visualization. You can use Durable Functions to build workflows that span hours, days, or even weeks.
Example: Building a Multi-Step Workflow with Durable Functions
To illustrate the power of Durable Functions, let’s walk through an example of building a multi-step workflow that involves parallel execution and error handling. Our workflow will perform the following steps:
- Receive a list of URLs to scrape
- Scrape each URL in parallel
- Store the results in a database
- Send a notification when the job is complete
Step 1: Receiving a List of URLs
We’ll start by creating an HTTP-triggered function that receives a list of URLs to scrape. This function will validate the input and pass it to the orchestrator function.
[FunctionName("StartScrapeWorkflow")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
List<string> urls = JsonConvert.DeserializeObject<List<string>>(requestBody);
if (urls == null || urls.Count == 0)
{
return new BadRequestObjectResult("Please pass a list of valid URLs in the request body.");
}
string instanceId = await starter.StartNewAsync("ScrapeOrchestrator", urls);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
return new OkObjectResult($"Started orchestration with ID = '{instanceId}'.");
}
In this code, we define an HTTP-triggered function called StartScrapeWorkflow that receives an HTTP POST request with a list of URLs in the request body. We use the Durable Functions extension to create an instance of the ScrapeOrchestrator function, passing in the list of URLs as input.
Step 2: Scraping URLs in Parallel
Next, we’ll create the ScrapeOrchestrator function, which will be responsible for scraping each URL in parallel. The ScrapeOrchestrator function will use the Durable Functions extension to create child functions that scrape each URL and store the results in a database.
[FunctionName("ScrapeOrchestrator")]
public static async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
List<string> urls = context.GetInput<List<string>>();
var tasks = new List<Task<string>>();
foreach (string url in urls)
{
Task<string> task = context.CallActivityAsync<string>("ScrapeUrl", url);
tasks.Add(task);
}
await Task.WhenAll(tasks);
var results = new List<string>();
foreach (Task<string> task in tasks)
{
if (task.IsCompletedSuccessfully)
{
results.Add(task.Result);
}
}
await context.CallActivityAsync("StoreResultsInDatabase", results);
return results;
}
In this code, we define an orchestrator function called ScrapeOrchestrator that receives the list of URLs as input. The function then creates a list of tasks, where each task represents the scraping of a single URL. The CallActivityAsync method is used to create a child function called ScrapeUrl that scrapes a single URL and returns the results as a string.
We use the Task.WhenAll method to execute all tasks in parallel, and then store the results in a list. We use the CallActivityAsync method again to create a child function called StoreResultsInDatabase that stores the results in a database.
Step 3: Storing Results in a Database
The StoreResultsInDatabase function is a simple function that takes a list of results and stores them in a database. We won’t go into the details of this function, but here’s an example implementation using the Azure Cosmos DB SQL API:
[FunctionName("StoreResultsInDatabase")]
public static async Task Run(
[ActivityTrigger] List<string> results,
[CosmosDB(
databaseName: "my-db",
collectionName: "results",
ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<dynamic> documents)
{
foreach (string result in results)
{
await documents.AddAsync(new { id = Guid.NewGuid().ToString(), result = result });
}
}
In this code, we define an activity function called StoreResultsInDatabase that takes a list of results as input. We use the CosmosDB attribute to specify the database and collection where the results should be stored, and we use the IAsyncCollector interface to insert documents into the database.
Step 4: Sending a Notification
Finally, we’ll create a function that sends a notification when the job is complete. We’ll use the SendGrid API to send an email to a specified address.
[FunctionName("SendNotification")]
public static async Task Run(
[TimerTrigger("0 */5 * * * *")] TimerInfo myTimer,
[DurableClient] IDurableOrchestrationClient client,
ILogger log)
{
string instanceId = Environment.GetEnvironmentVariable("INSTANCE_ID");
DurableOrchestrationStatus status = await client.GetStatusAsync(instanceId);
if (status != null && status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
{
string email = Environment.GetEnvironmentVariable("EMAIL_ADDRESS");
var message = new SendGridMessage
{
From = new EmailAddress("noreply@myapp.com"),
Subject = "Scraping Job Complete",
HtmlContent = "The scraping job has completed successfully."
};
message.AddTo(new EmailAddress(email));
var client = new SendGridClient(Environment.GetEnvironmentVariable("SENDGRID_API_KEY"));
var response = await client.SendEmailAsync(message);
log.LogInformation($"Sent email notification to {email}. Response status code: {response.StatusCode}");
}
}
In this code, we define a timer-triggered function called SendNotification that runs every 5 minutes. We use the Durable Functions extension to get the status of the orchestration instance specified by the INSTANCE_ID environment variable.
If the job has completed successfully, we use the SendGrid API to send an email to the address specified by the EMAIL_ADDRESS environment variable. We create a SendGridMessage object and set the sender, subject, and body of the email. We then use the SendGridClient object to send the email.
Conclusion
Durable Functions is an extension of Azure Functions that enables developers to write long-running workflows in a serverless environment. Durable Functions provide a way to build complex, multi-step processes that can span hours, days, or even weeks. Durable Functions use checkpoints to ensure that the workflow can survive failures. Checkpoints are stored in Azure Storage, which provides durability and reliability. Durable Functions also provide a feature called the Durable Task Framework, which includes features such as timers, sub-orchestrations, and human interactions. Durable Functions provide a powerful way to build scalable and reliable serverless applications that can handle complex workflows.