Azure Fonctions & Triggers personnalisés.

.NET CoreAzureAzure functionTrigger
Pierrick Gourlain - 16/12/2021 à 19:36:020 commentaire

Qu’est-ce qu’une azure fonction ?

Si ce sigle ne vous parle pas, rendez-vous ici pour une introduction: https://docs.microsoft.com/fr-fr/azure/azure-functions/functions-overview

 

Les azure fonctions ont la particularité de pouvoir être déclenchées par des évènements. Ces évènements peuvent être de type

-       Timer

-       Http

-       Message SignalR

-       Modification de document CosmoDB

-       Message Queue

-       Message RabbitMQ

-       Message Kafka

-       …

 

Ces déclencheurs sont appelés des ‘Triggers’, et il existe un framework pour écrire ses propre triggers. Ce qui vous permet de déclencher une azure fonction depuis n’importe quelle source de donnée. Nous allons donc voir comment en écrire un.

 


Un Trigger FTP


Le principe : « recevoir » les fichiers d’un répertoire particulier sur un FTP, pour effectuer un traitement

   schema 1

L’idée est de faire croire au code métier que c’est le FTP qui « pousse » de la donnée dans l’Azure Fonction. Le code métier ressemblera à ceci :

[FunctionName("Function1")]
public static Task Run([FtpTrigger("myFtpConfig")] FtpMessage req, ILogger log)
{
 log.LogInformation($"C# FTP trigger function processed a file {req.FileName}, size:{req.FileContent.Length}.");
 return Task.CompletedTask;
}
   

-       On remarque qu’il n’y pas de code technique : celui-ci est « caché » derrière l’attribut ‘FtpTrigger’

-       Le fichier reçu est considéré comme un message

-       Le code est plus simple et facile à maintenir

 

 

Comment ça marche :

Le schéma 1, indique que le serveur FTP « appel » l’Azure fonction. D’un point de vue technique ce n’est pas le cas, …bizarre, puisque dans le code de l’azure fonction, il n’y a rien qui fait appel au FTP et le seveur FTP n’effectue aucun appel … comment ça marche ????

La réalité est un peu différente. En effet un serveur FTP est en écoute, et une azure fonction est également en écoute. Il faut donc un élément intermédiaire pour relier les deux, un sorte de « médiateur ».

Le « médiateur » caché derrière chaque « trigger » n’est qu’un « Thread » qui va faire du polling vers la source de donnée et appeler la méthode de l’Azure fonction. On peut donc schématiser notre architecture comme suit :

On remarque qu’il existe donc un bout de code, qui va interroger le serveur FTP et appeler la méthode de l’Azure fonction. Ce qui donne l’impression(du point de vue de l’AF) que c’est le serveur FTP appelle la méthode de l’AF.

Ainsi avec cette technique on peut facilement imaginer des triggers de type SQL, FTP, …

On peut donc considérer notre trigger contient le code de liaison entre la source de donnée(ici FTP) et l’Azure Fonction.

Donc derrière chaque trigger, il existe du code qui « tourne ». En règle général c’est un thread dédié qui va gérer la lecture de la donnée et l’appel à l’AF(Azure Fonction).

Nous venons de voir que derrière chaque trigger, il existe du code de liaison entre une source de donnée et une Azure fonction. Entrons en dans le détails…

 

 


Ouvrons le capot

 

Derrière un « XXXTrigger » se cache tout un framework d’extension autour des Azure fonctions. En effet, dans l’exemple précédent nous utilisons un attribut ‘FtpTriggerAttribute’. Et cet attribut est le point d’entrée visible du code de liaison entre le FTP et l’AF. Pour réaliser un trigger personnalisé, nous allons devoir implementer plusieurs classes pour s’interfacer avec le framework d’extension AF.


Le schéma global de ce qui se cache derrière peut être répresenté ainsi :

      Schéma 2

« User Code » en vert : represente le code de l’utilisateur de notre « custom trigger »

« runtime » en jaune : représente le code d’initialisation du trigger

« Thread details » en orange : représente le code d’execution du trigger (le code de liaison du FTP vers l’AF)

Pour avoir un trigger complet et fonctionnel, il nous faut implémenter la partie « jaune » et « orange ».

 


Le rôle de l’attribut « FtpTriggerAttribute »

 

  [AttributeUsage(AttributeTargets.Parameter)]
  [Binding(TriggerHandlesReturnValue = true)]
  public class FtpTriggerAttribute : GenericTriggerAttribute
  {    
    public FtpTriggerAttribute(string configKey)
    {
      this.ConfigKey = configKey;
    }
  }

 

Son rôle est de « configurer » l’azure fonction qui sera appelé par le « thread » dédié. Cet attribut est utilisé par le développeur du code « métier ». Il est donc important que celui-ci soit simple à utiliser.

[FunctionName("Function1")]
public static Task Run([FtpTrigger("myFtpConfig")] FtpMessage req, ILogger log)
{
  log.LogInformation($"C# FTP trigger function processed a file {req.FileName}, size:{req.FileContent.Length}.");
  return Task.CompletedTask;
}

 

Ici l’utilisateur (le développeur) fournit la clé de configuration à utiliser. La configuration comportera les élément de connexion à notre source de donnée(FTP).

 

 


Enregistrer son extension

Nous venons de voir le code côté utilisateur de notre extension. En tant que concepteur d’une extension vous devez utiliser le framework fournit par Microsoft. La première chose est d’enregistrer votre extension et cela pendant la phase d’initialisation de l’Azure Fonction (en réalité d’un webjob GitHub - Azure/azure-webjobs-sdk-extensions: Azure WebJobs SDK Extensions)

  internal class FtpWebJobsStartup : IWebJobsStartup
  {
    public void Configure(IWebJobsBuilder builder)
    {
      builder.AddExtension<FtpExtensionConfig>();
    }
  }

 

 

Et AddExtension est déclarée comme ceci :

  public static IWebJobsExtensionBuilder AddExtension<TExtension>(this IWebJobsBuilder builder) where TExtension : class, IExtensionConfigProvider;

 

L’étape suivante est donc de déclarer sa propre implémentation de ‘IExtensionConfigProvider

 

 


IExtensionConfigProvider

 

« FtpExtensionConfig » est la classe que nous venons d’enregistrer, et celle-ci doit implémenter l’interface « IExtensionConfigProvider » ainsi que porter un attribut [Extension(….)]

    [Extension("ftp", configurationSection: "ftp")]
  internal class FtpExtensionConfig : IExtensionConfigProvider

L’implémentation de cette interface, va nous permettre d’enregistrer notre extension pendant le démarrage de l’AF.

Nous allons donc coder dans cette classe les « capacités » de notre extension, à savoir un trigger. La documentation concernant les extensions autour des Azure functions commence ici : https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-register

Dans notre cas, nous allons « lier » notre attribut (FTPTrigger) à un « bindingProvider »

public void Initialize(ExtensionConfigContext context)
{
  //use generic provider, with our specific Type dedicated to FTP
  var triggerBindingProvider = new GenericTriggerAttributeBindingProvider<
    FtpTriggerAttribute, 
    ConnectionOptions, 
    FtpMessage, 
    FtpActionResult, 
    FtpTriggerListener>(_configuration, _nameResolver, _loggerFactory, TRIGGERLOGCATEGORY);
  //add link between FtpTriggerAttribute <--> our bindingProvider 
  context.AddBindingRule<FtpTriggerAttribute>()
    .BindToTrigger(triggerBindingProvider);
}

Ici nous mettons en place la liaison entre un attribut et un fournisseur de binding.

Dans notre cas, le développeur déclare un « binding » en écrivant ceci :

  [FtpTrigger("myFtpConfig")] FtpMessage req

En appposant l’attribut « FtpTrigger » sur son paramètre d’AF, et celui-ci sera remplit automatiquement au moment de l’appel de l’AF.

 

 


ITriggerBindingProvider

Un TriggerBindingProvider est une factory de TriggerBinding. Dans notre exemple nous allons extraire les informations de l’attribut ‘FTPTrigger’, effectuer des vérifications des paramètres de l’AF.

public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context)
{
  if (context is null)
  {
    throw new ArgumentNullException(nameof(context));
  }
 
  //extract info from attribute
  ParameterInfo parameter = context.Parameter;
  TTriggerAttribute triggerAttribute = parameter.GetCustomAttribute<TTriggerAttribute>(inherit: false);
  if (triggerAttribute is null)
  {
    return Task.FromResult<ITriggerBinding>(null);
  }
  //check type of trigger parameter
  CheckParameterType(parameter.ParameterType);
  //create trigger binding
  var triggerBindingInstance = (ITriggerBinding)new GenericTriggerBinding<TOptions, TMessage, TResult, TListener>(_configuration, _logger, triggerAttribute.ConfigKey);
 
  return Task.FromResult<ITriggerBinding>(triggerBindingInstance);
} 

 

Une fois les vérifications effectuées, une instance de ITriggerBinding peut être renvoyée.  C’est ici qu’est utilisé la clé de configuration qui à été positionné dans le code métier (triggerAttribute.ConfigKey )

Si les vérifications ne passent pas, une exception est soulevée, et l’AF ne démarre pas. Ici CheckParameterType(…) va vérifier que le type du paramete est bien celui attendu.

 


ITriggerBinding

Le role d’un ITriggerBinding est de fournir

-       Une description du parametre pour ce trigger binding

-       Le « ValueProvider »  pour le paramètre de la fonction lors de son invocation

-       Un « Ilistener » qui sera responsable d’invoquer l’AF après lecture de la source de donnée, via un ITriggerFunctionExecutor.

Et de manière optionnelle, ce TriggerBinding fournit le ‘binding’ de la valeur de retour de l’AF si celle-ci retourne un valeur.

public Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
{
  LogInformation("BindAsync");
 
  var valueProvider = new ObjectValueProvider(value, this.TriggerValueType);
  var aggregateBindingData = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
 
  var result = new TriggerData(valueProvider, aggregateBindingData);
  result.ReturnValueProvider = _memoryResponseHandler;
  return Task.FromResult<ITriggerData>(result);
}
 
public Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
{
  LogInformation("CreateListenerAsync");
  if (context == null)
  {
    throw new ArgumentNullException(nameof(context));
  }
  TOptions options = new TOptions();
  //available keys list for configuration
  //AzureFunctionsJobHost, AzureWebJobsConfigurationSection, AzureWebJobsScriptRoot
  var section = _configuration.GetSection($"AzureFunctionsJobHost:{_configKey}");
  section.Bind(options);
 
  var result = Activator.CreateInstance(typeof(TListener), context.Executor, context.Descriptor.Id, _logger, options, _memoryResponseHandler);
  return Task.FromResult((IListener)result);
}

 

Ici on remarque que _configKey est utilisé pour aller lire la configuration du FTP (url, user, pwd, …). En effet le thread aura besoin de toutes les informations de connexion au FTP pour lire la source de donnée et invoquer l’AF.

 

 


IListener

Le listener c’est le coeur de notre « TriggerBinding », il est responsable de créer un/des thread(s) pour réaliser la lecture des fichiers depuis le FTP et d’appeler l’AF avec le resultat.

Pour simplifier la gestion du listener, dans notre exemple une classe de base à été réalisée. Celle-ci a pour objectif de gérer le code technique du thread et de la politique de reprise en cas d’echec sur l’appel de l’AF. Le code est fournit, vous pourrez donc regardez en détails si vous en avez besoin.

Ainsi, la classe dérivée « FtpTriggerListener », ne contient que du code dédié à :

-       la récupération des fichiers sur le FTP

-       la gestion du polling

/// <summary>
/// call on each loop, and wait 10 seconds
/// </summary>
/// <param name="peviousReadElapsed"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected override Task PreviousReadMessages(TimeSpan peviousReadElapsed, CancellationToken cancellationToken)
{
  //We define a polling to 10 seconds
  //wait 10 seconds between loop
  if (peviousReadElapsed.TotalSeconds < 10)
  {
    var timeToWait = TimeSpan.FromSeconds(10).Subtract(peviousReadElapsed);
    cancellationToken.WaitHandle.WaitOne(timeToWait);
  }
  return Task.CompletedTask;
}
 
/// <summary>
/// read file from ftp depending on user configuration
/// </summary>
/// <returns></returns>
protected override async IAsyncEnumerable<FtpMessage> ReadMessages()
{
  var creds = new NetworkCredential(_options.Username, _options.Password);
  //list files on FTP
  var fileNames = await ReadFileNamesAsync(creds);
  foreach (var fileName in fileNames)
  {
    //foreach each file read content
 
    FtpWebRequest request = (FtpWebRequest)WebRequest.Create($"{_options.Host}/{_options.InFolder}/{fileName}");
    request.Method = WebRequestMethods.Ftp.DownloadFile;
    request.UseBinary = true;
    request.Credentials = creds;
    request.EnableSsl = _options.secureFTP;
    using FtpWebResponse responseContent = (FtpWebResponse)await request.GetResponseAsync();
    using (Stream responseContentStream = responseContent.GetResponseStream())
    {
      //TODO: read on demand => implement async reader
      var memStm = new MemoryStream();
      responseContentStream.CopyTo(memStm);
      yield return new FtpMessage()
      {
        FileName = fileName,
        FileContent = memStm.ToArray()
      };
    }
  }
 
}
 
protected override Task ProceedWithSuccessAsync(FtpMessage? message, FtpActionResult fnResult)
{
   // this method is called after successfully AF call
  //delete or move file to OutFolder
  return base.ProceedWithSuccessAsync(message, fnResult);
}
 
protected override Task ProceedWithFailureAsync(FtpMessage? message)
{
  //this method is called after unsuccessfull AF call
   //move file to ErrFolder
  return base.ProceedWithFailureAsync(message);
}

 

 

Comme vous pouvez le constater ci-dessus, l’implementation FTP ne contient que la lecture des fichiers et la gestion du temps d’attente (polling) entre deux lectures FTP

L’algorithme principal qui se trouve dans la classe de base ressemble à ceci (pseudo code):

  while (!EndRequested)
{
  PreviousReadMessages();
  foreach (var message in ReadMessages())
  {
    while (retry < _maxRetry)
    {
      var fnResult = TryExecuteAF(message);
      if (fnResult.Succeeded)
      {
        retyr = maxRetry;
        ProceedWithSuccessAsync(message, fnResultValue);
      } 
      else
      {
        retry++;
        if (retry < _maxRetry)
        {
          BackoffWait(retry);
        }
        else
        {
          ProceedWithFailureAsync(message);         
        }
      }
    }  
  }
}
 

Le code de liaison est donc « Readmessages() » et « TryExecuteAF() ». Retrouvez le code source complet ici : https://github.com/Expaceo/FTPBindings

 

En générant le graphe de dépendance, on remarque bien que notre FtpTriggerListener est le dernier maillon de la chaine

 

Nous avons donc vu les élements à implémenter pour faire son trigger personnalisé. Et vous pouvez analyser/debugger clonant le code qui se trouve ici : https://github.com/Expaceo/FTPBindings

 


Q & A

 

Pourquoi écrire son trigger personalisé, au lieu d’utiliser un « timer trigger » et une librairie partagée par tous les projets ?

-       Essentiellement pour le scaling. En effet le timer trigger ne peut pas être « scalé » (mis à l’echelle) en fonction d’un paramètre personalisé, le nombre de fichiers, la taille,…

-       Le corps de l’AF contient que le code métier

-       La gestion de la « transaction », n’est pas dans le code métier

o  La fonction renvoi un « code » pour dire « ok » ou « ko » et le trigger déplace/supprime le fichier en fonction de celui-ci.

Quels sont les problèmes liés a ce trigger :

-       La gestion des fichiers volumineux :

o  Il faudrait fournir un stream à l’Azure fonction

o  L’objectif ici n’est pas de fournir un FTPTrigger « production ready », mais bien de découvir l’écriture de celui-ci.

-       La gestion du scaling :

o  Ici, dans notre extension, il manque l’implementation de « IScaleMonitorProvider » au niveau du listener.

o  L’implémentation de IScaleMonitorProvider permet de gérer finement le nombre d’instance de l’AF.

 


Conclusion

Pour rappel, nous avons implémenter ceci :

 

En conclusion, l’écriture de Trigger personnalisés, permettent d’intégrer des scénarios « Legacy/Hybride » avec des Azure Fonctions. Graçe à ces Triggers nous pouvons imaginer différents scénarii d’intégration du mode « réactif » ou « push » dans nos architectures existantes. L’écriture de trigger personnalisés force la séparation du code technique du code fonctionnel (au sein de l’AF)

L’écriture d’une extension n’est pas triviale, en effet cela reste du code assez technique. Il peut donc être judicieux de regarder d’abord s’il n’existe pas quelque chose sur étagère pour votre besoin. Et pour être efficace il faut implémenter la gestion du scaling.


 

Le code complet ici : https://github.com/Expaceo/FTPBindings

 

 

 

 

 

Commentaires :

Aucun commentaires pour le moment


Laissez un commentaire :

Réalisé par
Expaceo