Create Custom Azure Function Triggers
by Gerade Geldenhuys
Azure Functions are a great serverless product offering by Microsoft Azure if you have some process that needs processes in a short amount of time. Azure Functions is event-driven by nature, meaning you can trigger them through various avenues, be it data sources or messaging.
Function Triggers
Azure Functions are initialized by triggers and they come standard with a bunch of triggers out of the box for easy set up. One such trigger, is the HTTP trigger that allows you to execute a function by requesting the function endpoint from a browser. Other triggers include the Timer, Webhook and Blob storage triggers. See this page for a complete list of triggers.
Scenario
In my MedPark application there is a business process to place an order for medication from the online pharmacy store. This process must generate an invoice for orders placed before invoking the Notification service to send out an Order Placed email. For this process I decided to implement an Azure Function that can be triggered when the OrderCreatedEvent is published to RabbitMq.
For this, I will need my function to be triggered by RabbitMq, but There is no RabbitMq trigger for Azure Functions out of the box so we will have to create our own custom trigger. Below is a list the components to create your own Azure Function triggers.
Attribute
This attribute will be used to decorate parameters in function run methods.
[Binding] | |
[AttributeUsage(AttributeTargets.Parameter)] | |
public class RabbitMqTriggerAttribute : Attribute | |
{ | |
public string HostName { get; set; } | |
public string Username { get; set; } | |
public string Password { get; set; } | |
public string Port { get; set; } | |
public string Connection { get; set; } | |
public string QueueName { get; set; } | |
public RabbitMqTriggerAttribute(string queueName) | |
{ | |
QueueName = queueName; | |
} | |
public void LoadRabbitSettings() | |
{ | |
HostName = Environment.GetEnvironmentVariable("HostName"); | |
Username = Environment.GetEnvironmentVariable("HostUsername"); | |
Password = Environment.GetEnvironmentVariable("Password"); | |
Port = Environment.GetEnvironmentVariable("Port"); | |
} | |
} |
I created a method to load the RabbitMq settings during runtime, but this can easily be substituted for the [AutoResolve]
attribute on your properties to load them from the AppSettings.
IListener
Next, we need to implement the functionality to listen to RabbitMq for messages from the queue we supplied for messages as they come through. The IListener
interface has the methods we need to implement.
public class RabbitMqTriggerListener : IListener | |
{ | |
private readonly ITriggeredFunctionExecutor _executor; | |
private readonly RabbitMqTriggerAttribute _attribute; | |
IModel _model { get; } | |
IBasicPublishBatch _createBasicPublishBatch => CreateBasicPublishBatch(); | |
private string _connectionString; | |
private string _hostName; | |
private string _queueName; | |
private string _userName; | |
private string _password; | |
private int _port; | |
private string _deadLetterExchangeName; | |
private string _consumerTag; | |
private bool _disposed; | |
private bool _started; | |
private EventingBasicConsumer _consumer; | |
private readonly ILogger _logger; | |
public RabbitMqTriggerListener(ITriggeredFunctionExecutor executor, RabbitMqTriggerAttribute attribute, ILogger logger) | |
{ | |
_executor = executor; | |
_attribute = attribute; | |
_logger = logger; | |
_queueName = _attribute.QueueName; | |
attribute.LoadRabbitSettings(); | |
ConnectionFactory connectionFactory = new ConnectionFactory(); | |
connectionFactory.Uri = GenerateRabbitHostUri(); | |
_model = connectionFactory.CreateConnection().CreateModel(); | |
} | |
public Task StartAsync(CancellationToken cancellationToken) | |
{ | |
ThrowIfDisposed(); | |
if (_started) | |
{ | |
throw new InvalidOperationException("The listener has already been started."); | |
} | |
_model.QueueDeclare(_attribute.QueueName, false, false, false); | |
_consumer = new EventingBasicConsumer(_model); | |
_consumer.Received += async (model, ea) => | |
{ | |
FunctionResult result = await _executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = ea }, cancellationToken); | |
if (result.Succeeded) | |
{ | |
_model.BasicAck(ea.DeliveryTag, false); | |
} | |
else | |
{ | |
if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey("requeueCount")) | |
{ | |
CreateHeadersAndRepublish(ea); | |
} | |
else | |
{ | |
RepublishMessages(ea); | |
} | |
} | |
}; | |
_consumerTag = _model.BasicConsume(queue: _queueName, autoAck: false, consumer: _consumer); | |
_started = true; | |
return Task.CompletedTask; | |
} | |
public Task StopAsync(CancellationToken cancellationToken) | |
{ | |
if (!_started) | |
{ | |
throw new InvalidOperationException("The listener has not yet been started or has already been stopped"); | |
} | |
_model.BasicCancel(_consumerTag); | |
_model.Close(); | |
_started = false; | |
_disposed = true; | |
return Task.CompletedTask; | |
} | |
public IBasicPublishBatch CreateBasicPublishBatch() | |
{ | |
return _model.CreateBasicPublishBatch(); | |
} | |
internal void CreateHeadersAndRepublish(BasicDeliverEventArgs ea) | |
{ | |
_model.BasicAck(ea.DeliveryTag, false); | |
if (ea.BasicProperties.Headers == null) | |
{ | |
ea.BasicProperties.Headers = new Dictionary<string, object>(); | |
} | |
ea.BasicProperties.Headers["requeueCount"] = 0; | |
//_logger.LogDebug("Republishing message"); | |
_model.BasicPublish(exchange: string.Empty, routingKey: ea.RoutingKey, basicProperties: ea.BasicProperties, body: ea.Body); | |
} | |
internal void RepublishMessages(BasicDeliverEventArgs ea) | |
{ | |
int requeueCount = Convert.ToInt32(ea.BasicProperties.Headers["requeueCount"]); | |
// Redelivered again | |
requeueCount++; | |
ea.BasicProperties.Headers["requeueCount"] = requeueCount; | |
if (Convert.ToInt32(ea.BasicProperties.Headers["requeueCount"]) < 5) | |
{ | |
_model.BasicAck(ea.DeliveryTag, false); // Manually ACK'ing, but resend | |
//_logger.LogDebug("Republishing message"); | |
_model.BasicPublish(exchange: string.Empty, routingKey: ea.RoutingKey, basicProperties: ea.BasicProperties, body: ea.Body); | |
} | |
else | |
{ | |
// Add message to dead letter exchange | |
//_logger.LogDebug("Requeue count exceeded: rejecting message"); | |
_model.BasicReject(ea.DeliveryTag, false); | |
} | |
} | |
internal Uri GenerateRabbitHostUri() | |
{ | |
return new Uri($"amqp://{_attribute.Username}:{_attribute.Password}@{_attribute.HostName}:{_attribute.Port}"); | |
} | |
public void Cancel() | |
{ | |
StopAsync(CancellationToken.None).Wait(); | |
} | |
public void Dispose() | |
{ | |
} | |
private void ThrowIfDisposed() | |
{ | |
if (_disposed) | |
{ | |
throw new ObjectDisposedException(null); | |
} | |
} | |
} |
The gist above creates a message consumer to listen for message that pass through the queue you supplied. We then use the ITriggeredFunctionExecutortor
execute the function.
But, in order to create the IListener and process the information that we have released, we have to implement the ITriggerBinding
interface.
ITriggerBinding
The implementation of this interface is to bind the data from the listener to the parameters when instructing the execution of the function by the listener class.
public class RabbitMqTriggerBinding : ITriggerBinding | |
{ | |
private readonly ILogger _logger; | |
public Type TriggerValueType => typeof(BasicDeliverEventArgs); | |
public IReadOnlyDictionary<string, Type> BindingDataContract => new Dictionary<string, Type>(); | |
private readonly ParameterInfo _parameter; | |
public RabbitMqTriggerBinding(ParameterInfo parameter, ILogger logger) | |
{ | |
_parameter = parameter; | |
_logger = logger; | |
} | |
public Task<ITriggerData> BindAsync(object value, ValueBindingContext context) | |
{ | |
BasicDeliverEventArgs message = value as BasicDeliverEventArgs; | |
IReadOnlyDictionary<string, object> bindingData = CreateBindingData(message); | |
return Task.FromResult<ITriggerData>(new TriggerData(null, bindingData)); | |
} | |
public Task<IListener> CreateListenerAsync(ListenerFactoryContext context) | |
{ | |
var execuror = context.Executor; | |
RabbitMqTriggerAttribute attr = _parameter.GetCustomAttribute<RabbitMqTriggerAttribute>(); | |
var listener = new RabbitMqTriggerListener(execuror, attr, _logger); | |
return Task.FromResult<IListener>(listener); | |
} | |
public ParameterDescriptor ToParameterDescriptor() | |
{ | |
return new TriggerParameterDescriptor | |
{ | |
Name = "Parametr: RabbitMqTrigger", | |
DisplayHints = new ParameterDisplayHints | |
{ | |
Prompt = "RabbitMq Trigger", | |
Description = "RabbitMq message trigger" | |
} | |
}; | |
} | |
internal static IReadOnlyDictionary<string, object> CreateBindingData(BasicDeliverEventArgs value) | |
{ | |
var bindingData = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase); | |
SafeAddValue(() => bindingData.Add(nameof(value.ConsumerTag), value.ConsumerTag)); | |
SafeAddValue(() => bindingData.Add(nameof(value.DeliveryTag), value.DeliveryTag)); | |
SafeAddValue(() => bindingData.Add(nameof(value.Redelivered), value.Redelivered)); | |
SafeAddValue(() => bindingData.Add(nameof(value.Exchange), value.Exchange)); | |
SafeAddValue(() => bindingData.Add(nameof(value.RoutingKey), value.RoutingKey)); | |
SafeAddValue(() => bindingData.Add(nameof(value.BasicProperties), value.BasicProperties)); | |
SafeAddValue(() => bindingData.Add(nameof(value.Body), value.Body)); | |
return bindingData; | |
} | |
private static void SafeAddValue(Action addValue) | |
{ | |
try | |
{ | |
addValue(); | |
} | |
catch | |
{ | |
// some message property getters can throw, based on the | |
// state of the message | |
} | |
} | |
} |
ITriggerBindingProvider
The implementation of ITriggerBindingProvider
will look for the parameters of our Azure Functions that have the RabbitMqTrigger attribute that we created at the beginning. And when it finds it, it will instantiate a new RabbitMqTriggerBinding for that parameter.
public class RabbitMqTriggerBindingProvider : ITriggerBindingProvider | |
{ | |
private readonly ILogger _logger; | |
public RabbitMqTriggerBindingProvider(ILogger logger) | |
{ | |
_logger = logger; | |
} | |
public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context) | |
{ | |
var parameter = context.Parameter; | |
var attribute = parameter.GetCustomAttribute<RabbitMqTriggerAttribute>(false); | |
if (attribute == null) return Task.FromResult<ITriggerBinding>(null); | |
if (parameter.ParameterType != typeof(BasicDeliverEventArgs)) throw new InvalidOperationException("Invalid parameter type"); | |
var triggerBinding = new RabbitMqTriggerBinding(parameter, _logger); | |
return Task.FromResult<ITriggerBinding>(triggerBinding); | |
} | |
} |
Last but not least, we will implement the interface IExtensionConfigProvider
to create a new rule that uses the object RabbitMqTriggerBindingProvider to solve the type attributes RabbitMqTriggerAttribute.
public class RabbitMqExtensionConfigProvider : IExtensionConfigProvider | |
{ | |
private ILogger _logger; | |
public RabbitMqExtensionConfigProvider(ILogger logger) | |
{ | |
_logger = logger; | |
} | |
public void Initialize(ExtensionConfigContext context) | |
{ | |
var rule = context.AddBindingRule<RabbitMqTriggerAttribute>(); | |
rule.BindToTrigger<BasicDeliverEventArgs>(new RabbitMqTriggerBindingProvider(_logger)); | |
} | |
} |
Lastly, we need to register our trigger default configuration provider we have created in Startup.cs of the Azure Functions project and add our extension.
Trigger In Action
Below is a sample implementation of using the RabbitMq trigger.
public static class OrderPlaceInvoiceGenerator | |
{ | |
[FunctionName("OrderPlaceInvoiceGenerator")] | |
public static void Run([RabbitMQTrigger("medpark.orderservice/basket-service.basket_checked_out_invoice_gen", ConnectionStringSetting = "connectionStringSetting")] BasicDeliverEventArgs args, | |
ILogger log) | |
{ | |
log.LogInformation($"Message received from RabbitMQ trigger: {message}"); | |
} | |
} |
In the Order service, once an order is placed an event will be published to the medpark.orderservice/basket-service.basket_checked_out_invoice_gen
queue, this function will then pick it up and generate the Invoice document.
Thank you for reading.