Introduction

Dans les architectures logicielles modernes, le découplage entre composants est un enjeu majeur. Comment permettre à un service d'envoyer une commande sans connaître le service qui la traite ? Comment publier un événement sans savoir combien de consommateurs l'écoutent ? La réponse réside dans le pattern Mediator.

Depuis plusieurs années, pour mettre en place ce pattern dans mes logiciels, j'utilise la librairie open source MediatR, mais depuis la version 12, cette librairie est devenue commerciale. Il y avait 2/3 trucs qui me plaisait pas trop dans l'utilisation de cette lib, et le fait quelle soit devenue commerciale m'a poussé à développer ma propre lib. Bien sur il y a des alternatives open source, mais le fait que ce pattern soit au coeur de mes développements, voir de nouveau une lib devenir commerciale est un red flag pour moi, et je ne veux pas re-changer un nouvelle fois. D'autant que j'aime ce challenge :)

D'ailleurs j'ai essayé Wolverine, c'est un super projet, mais ça fait trop de choses differentes et je préfère les responsabilités simples.

Du coup, j'ai créé pour mes besoins perso, une lib equivalente, que j'ai nommé ChannelMediator qui implémente le pattern Mediator en s'appuyant sur System.Threading.Channels, offrant des performances supérieures et une compatibilité en tout point avec API avec MediatR.


Le Pattern Mediator

Problème : le couplage direct

Un petit historique ne fait pas de mal pour comprendre pourquoi j'ai mis ce pattern au coeur de tous mes developpements. Sans mediator, chaque composant doit connaître explicitement ses dépendances. Un contrôleur appelle directement un service, qui appelle un repository, qui appelle un autre service… Le graphe de dépendances devient vite un plat de spaghetti, et ça, j'aime vraiment pas !.

graph LR
    A[Controller] --> B[OrderService]
    A --> C[EmailService]
    A --> D[LogService]
    B --> C
    B --> D
    C --> D
    style A fill:#ff6b6b,color:#fff
    style B fill:#ff6b6b,color:#fff
    style C fill:#ff6b6b,color:#fff
    style D fill:#ff6b6b,color:#fff

Chaque composant est couplé aux autres. Ajouter un nouveau service nécessite de modifier tous les appelants. On dirait l'empilage des lois et normes en France :)

Solution : le Mediator

Le Mediator introduit un intermédiaire central. Les composants n'interagissent plus entre eux directement, mais envoient des messages au Mediator, qui les route vers les bons handlers (Spécifiquement à MediatR et ma nouvelle lib, les handlers sont découverts par réflexion)

graph TB
    A[Controller] -->|Send Request| M((Mediator))
    M -->|Route| B[OrderHandler]
    M -->|Route| C[EmailHandler]
    M -->|Route| D[LogHandler]
    style M fill:#4ecdc4,color:#fff,stroke:#333,stroke-width:3px
    style A fill:#45b7d1,color:#fff
    style B fill:#96ceb4,color:#fff
    style C fill:#96ceb4,color:#fff
    style D fill:#96ceb4,color:#fff

Les deux modes de communication

Le pattern Mediator supporte deux types de messages fondamentaux :

graph TB
    subgraph "Request / Response (1:1)"
        R1[Sender] -->|"Send(Request)"| M1((Mediator))
        M1 -->|Route| H1[Handler unique]
        H1 -->|Response| M1
        M1 -->|Response| R1
    end

    subgraph "Notification / Event (1:N)"
        R2[Publisher] -->|"Publish(Event)"| M2((Mediator))
        M2 -->|Broadcast| H2[Handler A]
        M2 -->|Broadcast| H3[Handler B]
        M2 -->|Broadcast| H4[Handler C]
    end

    style M1 fill:#4ecdc4,color:#fff,stroke:#333,stroke-width:3px
    style M2 fill:#4ecdc4,color:#fff,stroke:#333,stroke-width:3px
Mode Cardinalité Retour Cas d'usage
Request/Response 1 sender → 1 handler Oui (TResponse) Query, Command avec résultat
Command 1 sender → 1 handler Non (Unit) Fire-and-forget structuré
Notification 1 publisher → N handlers Non Events, side-effects

Avantages du pattern

  • Découplage : l'émetteur ne connaît pas le récepteur
  • Single Responsibility : chaque handler a une seule responsabilité
  • Testabilité : les handlers sont testables indépendamment
  • Extensibilité : ajouter un handler ne modifie aucun code existant
  • Pipeline : possibilité d'injecter des comportements transversaux (logging, validation, etc.)

Architecture de ChannelMediator

Vue d'ensemble

ChannelMediator se distingue par son architecture basée sur System.Threading.Channels. Au lieu de résoudre et d'invoquer les handlers de façon synchrone sur le thread appelant (comme MediatR), ChannelMediator encapsule chaque requête dans une enveloppe (envelope) et l'envoie dans un channel asynchrone. Un pump dédié consomme ces enveloppes et dispatche les requêtes vers les handlers appropriés.

graph LR
    subgraph "Thread appelant"
        S[Sender]
    end

    subgraph "Channel (thread-safe queue)"
        S -->|WriteAsync| CW["Channel Writer"]
        CW --> Q["Unbounded Queue<br/>(FIFO)"]
        Q --> CR["Channel Reader"]
    end

    subgraph "Pump (background task)"
        CR -->|ReadAllAsync| P["ProcessAsync Loop"]
        P -->|DispatchAsync| H["Handler"]
    end

    H -->|TrySetResult| TCS["TaskCompletionSource"]
    TCS -->|await| S

    style Q fill:#ffd93d,color:#333,stroke:#333,stroke-width:2px
    style P fill:#4ecdc4,color:#fff

Diagramme de classes principal

classDiagram
    class IMediator {
        <<interface>>
        +Send~TResponse~(IRequest~TResponse~ request, CancellationToken ct) Task~TResponse~
        +Send(IRequest request, CancellationToken ct) Task
        +Send(object request, CancellationToken ct) Task~object?~
        +Publish~TNotification~(TNotification notification, CancellationToken ct) Task
        +Publish(object notification, CancellationToken ct) Task
    }

    class Mediator {
        -handlers : IReadOnlyDictionary
        -notificationHandlers : IReadOnlyDictionary
        -channel : Channel~IRequestEnvelope~
        -pump : Task
        -ProcessAsync() Task
        +Dispose()
        +DisposeAsync() ValueTask
    }

    class IMediatorFactory {
        <<interface>>
        +CreateMediator() IMediator
    }

    class MediatorFactory {
        +CreateMediator() IMediator
    }

    class IRequestEnvelope {
        <<interface>>
        +DispatchAsync(handlers, CancellationToken) ValueTask
    }

    class RequestEnvelope~TResponse~ {
        -IRequest~TResponse~ _request
        -TaskCompletionSource~TResponse~ _completionSource
        -CancellationToken _callerToken
        +DispatchAsync(handlers, CancellationToken) ValueTask
    }

    IMediator <|.. Mediator
    IMediatorFactory <|.. MediatorFactory
    MediatorFactory --> Mediator : creates
    IRequestEnvelope <|.. RequestEnvelope
    Mediator --> IRequestEnvelope : processes
    Mediator --> IRequestHandlerWrapper : dispatches to

    style IMediator fill:#4ecdc4,color:#fff
    style Mediator fill:#45b7d1,color:#fff
    style IMediatorFactory fill:#4ecdc4,color:#fff

Contrats (ChannelMediator.Contracts)

Comme avec MediatR j'ai séparé le package ChannelMediator.Contracts. Il ne contient que les interfaces, sans aucune dépendance. Les projets Domain/Contracts n'ont besoin de référencer que ce package léger.

classDiagram
    class IRequest~TResponse~ {
        <<interface>>
    }

    class IRequest {
        <<interface>>
    }

    class INotification {
        <<interface>>
    }

    class Unit {
        <<struct>>
        +Value Unit$
        +Task Task~Unit~$
        +ValueTask ValueTask~Unit~$
    }

    IRequest~TResponse~ <|-- IRequest : TResponse = Unit
    IRequest~Unit~ <|.. IRequest

    style IRequest fill:#96ceb4,color:#fff
    style INotification fill:#96ceb4,color:#fff
    style Unit fill:#ffd93d,color:#333

Handlers et Pipeline

classDiagram
    class IRequestHandler {
        <<interface>>
        +Handle(TRequest, CancellationToken) Task~TResponse~
    }

    class ICommandHandler {
        <<interface>>
        +Handle(TRequest, CancellationToken) Task
    }

    class INotificationHandler {
        <<interface>>
        +Handle(TNotification, CancellationToken) Task
    }

    class IPipelineBehavior {
        <<interface>>
        +HandleAsync(TRequest, next, CancellationToken) ValueTask~TResponse~
    }

    class RequestHandlerWrapper {
        -IServiceProvider _serviceProvider
        +HandleAsync(object, CancellationToken) ValueTask~object~
    }

    RequestHandlerWrapper --> IRequestHandler : resolves and invokes
    RequestHandlerWrapper --> IPipelineBehavior : chains behaviors

    style IPipelineBehavior fill:#ff6b6b,color:#fff
    style IRequestHandler fill:#96ceb4,color:#fff
    style ICommandHandler fill:#96ceb4,color:#fff
    style INotificationHandler fill:#96ceb4,color:#fff
    style RequestHandlerWrapper fill:#45b7d1,color:#fff

Flux complet d'une requête

sequenceDiagram
    participant Caller
    participant Mediator
    participant Channel
    participant Pump
    participant Wrapper as RequestHandlerWrapper
    participant Behavior as PipelineBehavior
    participant Handler

    Caller->>Mediator: Send(request)
    Mediator->>Mediator: Create TaskCompletionSource
    Mediator->>Mediator: Wrap in RequestEnvelope
    Mediator->>Channel: WriteAsync(envelope)
    Caller-->>Caller: await completionSource.Task

    Note over Pump: Background loop
    Pump->>Channel: ReadAllAsync()
    Channel-->>Pump: envelope
    Pump->>Pump: envelope.DispatchAsync()
    Pump->>Wrapper: HandleAsync(request)
    Wrapper->>Wrapper: Resolve behaviors (DI)
    Wrapper->>Behavior: HandleAsync(request, next)
    Behavior->>Behavior: Before logic
    Behavior->>Handler: Handle(request)
    Handler-->>Behavior: response
    Behavior->>Behavior: After logic
    Behavior-->>Wrapper: response
    Wrapper-->>Pump: response
    Pump->>Pump: TrySetResult(response)
    Caller-->>Caller: response received!

Guide pratique

Configuration de base

using ChannelMediator;

var builder = Host.CreateDefaultBuilder(args);
builder.ConfigureServices((context, services) =>
{
    // Enregistre le mediator + scan automatique des handlers
    services.AddChannelMediator(config =>
    {
        config.Strategy = NotificationPublishStrategy.Parallel;
    }, typeof(Program).Assembly);
});

Définir une Request avec réponse

// Contrat (dans le projet Domain/Contracts)
public record AddToCartRequest(string ProductCode) : IRequest<CartItem>;

public record CartItem(string ProductCode, int Quantity, decimal Total);

// Handler (dans le projet Application/Infrastructure)
public class AddToCartHandler : IRequestHandler<AddToCartRequest, CartItem>
{
    public Task<CartItem> Handle(AddToCartRequest request, CancellationToken cancellationToken)
    {
        var item = new CartItem(request.ProductCode, 1, 29.99m);
        return Task.FromResult(item);
    }
}

Définir une Command (sans réponse)

// Contrat
public record SendEmailCommand(string To, string Subject, string Body) : IRequest;

// Handler
public class SendEmailHandler : IRequestHandler<SendEmailCommand>
{
    public Task Handle(SendEmailCommand request, CancellationToken cancellationToken)
    {
        Console.WriteLine($"Email envoyé à {request.To}: {request.Subject}");
        return Task.CompletedTask;
    }
}

Définir une Notification

// Contrat
public record OrderPlacedEvent(string OrderId, decimal Amount) : INotification;

// Handler 1
public class SendOrderConfirmation : INotificationHandler<OrderPlacedEvent>
{
    public Task Handle(OrderPlacedEvent notification, CancellationToken cancellationToken)
    {
        Console.WriteLine($"Confirmation envoyée pour commande {notification.OrderId}");
        return Task.CompletedTask;
    }
}

// Handler 2
public class UpdateInventory : INotificationHandler<OrderPlacedEvent>
{
    public Task Handle(OrderPlacedEvent notification, CancellationToken cancellationToken)
    {
        Console.WriteLine($"Stock mis à jour pour commande {notification.OrderId}");
        return Task.CompletedTask;
    }
}

Utiliser le Mediator

public class OrderController
{
    private readonly IMediator _mediator;

    public OrderController(IMediator mediator) => _mediator = mediator;

    public async Task<CartItem> AddToCart(string productCode)
    {
        // Request/Response : un seul handler, retourne un résultat
        return await _mediator.Send(new AddToCartRequest(productCode));
    }

    public async Task SendEmail(string to, string subject, string body)
    {
        // Command : un seul handler, pas de retour
        await _mediator.Send(new SendEmailCommand(to, subject, body));
    }

    public async Task PlaceOrder(string orderId, decimal amount)
    {
        // Notification : tous les handlers enregistrés sont appelés
        await _mediator.Publish(new OrderPlacedEvent(orderId, amount));
    }
}

Pipeline Behaviors (comportements transversaux)

Les pipeline behaviors permettent d'intercepter chaque requête avant et après son traitement, sans modifier le handler. C'est l'équivalent d'un middleware ASP.NET, mais au niveau du mediator. J'ai implémenté également la possibilité de targeter une request particulière possiblement.

graph LR
    R[Request] --> B1[Logging] --> B2[Validation] --> B3[Performance] --> H[Handler]
    H --> B3 --> B2 --> B1 --> Res[Response]

    style B1 fill:#ff6b6b,color:#fff
    style B2 fill:#ffd93d,color:#333
    style B3 fill:#4ecdc4,color:#fff
    style H fill:#96ceb4,color:#fff

Behavior global (s'applique à toutes les requêtes)

public class PerformanceMonitoringBehavior<TRequest, TResponse>
    : IPipelineBehavior<TRequest, TResponse>
    where TRequest : IRequest<TResponse>
{
    public async ValueTask<TResponse> HandleAsync(
        TRequest request,
        RequestHandlerDelegate<TResponse> next,
        CancellationToken cancellationToken)
    {
        var sw = Stopwatch.StartNew();

        var response = await next(); // Appel au handler (ou behavior suivant)

        sw.Stop();
        Console.WriteLine($"[PERF] {typeof(TRequest).Name} : {sw.ElapsedMilliseconds}ms");

        return response;
    }
}

// Enregistrement global (s'applique à TOUS les handlers)
services.AddOpenPipelineBehavior(typeof(PerformanceMonitoringBehavior<,>));

Behavior spécifique (ciblé sur un type de requête)

public class ValidationBehavior<TRequest, TResponse>
    : IPipelineBehavior<TRequest, TResponse>
    where TRequest : IRequest<TResponse>
{
    public async ValueTask<TResponse> HandleAsync(
        TRequest request,
        RequestHandlerDelegate<TResponse> next,
        CancellationToken cancellationToken)
    {
        if (request is AddToCartRequest cartRequest
            && string.IsNullOrEmpty(cartRequest.ProductCode))
        {
            throw new ArgumentException("Le code produit est obligatoire");
        }

        return await next();
    }
}

// Enregistrement ciblé (seulement pour AddToCartRequest)
services.AddPipelineBehavior<AddToCartRequest, CartItem,
    ValidationBehavior<AddToCartRequest, CartItem>>();

MediatorFactory : éviter les deadlocks

Puisque ChannelMediator utilise un channel à lecteur unique (single reader), un handler qui appelle mediator.Send() sur le même mediator provoquerait un deadlock. La solution : IMediatorFactory.

public class ProcessOrderHandler : IRequestHandler<ProcessOrderRequest, OrderResult>
{
    private readonly IMediatorFactory _factory;

    public ProcessOrderHandler(IMediatorFactory factory) => _factory = factory;

    public async Task<OrderResult> Handle(
        ProcessOrderRequest request, CancellationToken cancellationToken)
    {
        // Crée un mediator isolé avec son propre channel et pump
        await using var mediator = (IAsyncDisposable)_factory.CreateMediator();
        var m = (IMediator)mediator;

        // Ces appels ne bloquent pas le pump principal
        var item = await m.Send(new AddToCartRequest(request.ProductCode), cancellationToken);
        await m.Send(new SendEmailCommand(request.Email, "Confirmation", "..."), cancellationToken);

        return new OrderResult(request.OrderId, item, true, true);
    }
}

Stratégies de publication des notifications

ça c'est vrament un élément important qui n'est pas géré sur jusqu'a la version 12 de MediatR, c'est le fait de traiter les notificaitons en parallele, du coup je l'ai implémenté avec un choix de stratégie.

// Séquentielle (par défaut) : chaque handler est appelé l'un après l'autre
services.AddChannelMediator(config =>
{
    config.Strategy = NotificationPublishStrategy.Sequential;
});

// Parallèle : tous les handlers sont appelés en même temps (Task.WhenAll)
services.AddChannelMediator(config =>
{
    config.Strategy = NotificationPublishStrategy.Parallel;
});
graph TB
    subgraph "Sequential"
        S1[Notification] --> H1A[Handler A]
        H1A -->|terminé| H1B[Handler B]
        H1B -->|terminé| H1C[Handler C]
    end

    subgraph "Parallel"
        S2[Notification] --> H2A[Handler A]
        S2 --> H2B[Handler B]
        S2 --> H2C[Handler C]
    end

ChannelMediator vs MediatR ?

Critère MediatR ChannelMediator
Licence Commerciale (v12+) MIT / Open-source
Architecture Invocation synchrone sur le thread appelant Channel asynchrone + pump dédié
Back-pressure Non Oui (via Channel)
API Send, Publish, ISender Send, Publish (compatible)
Pipeline IPipelineBehavior<,> IPipelineBehavior<,> (compatible)
Handlers IRequestHandler<,> IRequestHandler<,> (compatible)
Notifications INotificationHandler<> INotificationHandler<> (compatible)
Commands IRequest IRequest (compatible)
Contrats séparés Non Oui (ChannelMediator.Contracts)
Multi-TFM .NET 8+ .NET 9, .NET 10
Factory Non Oui (IMediatorFactory)

Ce que ChannelMediator apporte en plus

  1. Back-pressure native : grâce à System.Threading.Channels, le système gère naturellement la pression quand les handlers sont plus lents que les producteurs.

  2. IMediatorFactory : permet de créer des instances isolées de mediator, chacune avec son propre channel et pump, idéal pour l'orchestration de workflows complexes.

Le code source se trouve ici : https://github.com/appliman/channelmediator