SubsNr: 5 GraphQL ma możliwość  poinformowania swoich klientów, gdy zajdzie jakieś zdarzenie. Korzysta on z modelu subskrypcji.

W tym wpisie pokaże Ci jak to wszystko napisać w ASP.NET Core po stronie serwera i po stronie klienta. 

 

Co to jest subskrypcja?

Klientem GraphQL API może być każdy w naszym przypadku jest to jeszcze inna aplikacja ASP.NET Core. Może to być jednak aplikacja JavaScript, która korzysta z Apollo.

Każdy klient bez względu na to, czym on jest ma możliwość utworzenia sobie takiego obserwatora do pewnych zdarzeń, które są zdefiniowane w GrapQL API.

Diagram GraphQL

W przypadku naszej aplikacji demo, Sklepu Smoków, klient chciałby dostawać wiadomości, gdy pojawi się jakaś nowa opinia na temat danego smoka utworzona przez innego użytkownika.

Dlaczego chciałby być on powiadomiony? Chociażby poto aby mógł odświeżyć swoją stronę i zobaczyć tą nową opinię. 

Co będzie zawierała nasza Schema w naszym API?

Diagram GraphQL 3 kanały Query, Subscription, Mutation

Jak pamiętasz z poprzednich wpisów w GraphQL mamy do dyspozycji w Schema:

  • Query : które służy do wysłania poleceń JSON służących do odczytywania danych
  • Mutation :które służą do wysłania poleceń JSON, które będą modyfikowały dane. Dane można modyfikować poprzez dodawania,kasowanie, jak i aktualizowanie.

Teraz dochodzą nam Subskrypcje i jest to trzeci kanał, który możemy obsłużyć.

Polega to na tym, że każdy klient, który został subskrybentem na dane zdarzenie powinien dostać wiadomość, gdy te zdarzenie zajdzie. 

Wiele klientów nasłuchuje GraphQL

Jak to się dzieje nie jest częścią specyfikacji GraphQL.

Jeśli chodzi o kod możesz się domyślać, że w przypadku .NET zapewne gdzieś pojawią się kolekcję typu Observable.

Jeśli chodzi o mechanikę działania to najczęściej do tego są użyte WebSocket-y.

API GraphQL komunikuje zdarzenie odpowiednim typem wiadomości, który jest kolejną klasą dziedziczącą po jeszcze innym typie ObjectGraphType

Do notyfikacji potrzebujemy obiektu, który będzie trzymał historię wiadomość, które zostały już wysłane. W tym samym obiekcie dobrze by było wysłać nową wiadomość, gdy dane zdarzenie zajdzie.

Tutaj także zastanawiałem się nad nazewnictwem. Trzeba jednak przyznać, że słowo "event" jest bardzo nadużywane w programowaniu i ma wiele znaczeń. Dlatego w tym przypadku zamiast słowa "Event" zastosowałem słowo "Message". 

Jak będzie działał nasz kod GraphQL

W .NET mamy interfejs na takie scenariusze i nazywa się on IObservable.  On reprezentuje strumień danych.

Wchodzimy w świat programowania reaktywnego. 

Łączymy tą referencję do naszej kolekcji IObservable z systemem Subskrypcji w Schema.

Diagram diagramem, ale jak to wygląda w kodzie. Wracam do naszego projektu API i dodajemy te omówione mechanizmy. 

Dodanie subskrypcji w Schema

Skoro zaczynamy programować reaktywne to znaczy, że musimy zainstalować tą paczkę NuGet "System.Reactive".

Typy jak ISubject, IObservable nie są dostępne w gołym .NET-cie

System.Reactive paczka Nuget

Do projektu dochodzą nowe klasy. Umieściłem je w folderze Messaging.

Nowe klasy do projektu DragonShop.API

Mamy naszą wiadomość o dodaniu nowej opinii.

public class OpinionAddedMessage
{
    public int DragonId { get; set; }
    public string Title { get; set; }
}

Oto nasza klasa, która będzie wysłała wiadomości.

public class OpinionMessageService
{
    private readonly ISubject<OpinionAddedMessage> _messageStream =
        new ReplaySubject<OpinionAddedMessage>(1);

    public OpinionAddedMessage AddOpinionAddedMessage(DragonExpertOpinion op)
    {
        var message = new OpinionAddedMessage
        {
            DragonId = op.DragonId,
            Title = op.Title
        };
        _messageStream.OnNext(message);
        return message;
    }

    public IObservable<OpinionAddedMessage> GetMessages()
    {
        return _messageStream.AsObservable();
    }
}

W polu prywatnym w typie ISubject będziemy przechowywać wiadomości do wysłania.

W metodzie OpinionAddedMessage z encji opinii przepisujemy pola i tworzymy naszą wiadomość, która trafia do strumienia poprzez metodę OnNext().

Metoda OnNext() wyśle wiadomość do wszystkich subskrybentów. Po "OnNext" zwracam wiadomość.

W metodzie GetMessage zamienimy ten strumień na typ IObservable. Dlaczego tak? W taki sposób klienci będą obserwować czy jak wolisz subskrybować ten strumień.

Dodajemy teraz trzeci kanał do naszej Schema. Zanim jednak do tego przejdziemy musi określić nowy typ danych, który będzie wystawiony na świat w GraphQL.

Nowy typ dla smoków w GraphQL

Oto typ reprezentujący w GraphQL naszą wiadomość.

public class OpinionAddedMessageType
    : ObjectGraphType<OpinionAddedMessage>
{
    public OpinionAddedMessageType()
    {
        Field(t => t.DragonId);
        Field(t => t.Title);
    }
}

Gdy to mamy teraz tworzymy definicję kanału Subskrypcji podobnie jak zrobiliśmy to z Query i Mutation.

DragonSubscription dodanie

Tutaj dodajemy nazwę kanału "Subscription"  oraz metody, które będzie on obsługiwał. 

Dla "opinionAdded" zwrócimy OpinionAddedMessage, a mechanizm subskrypcji opiera się na kolekcji IObservable, która zostanie zwrócona przez metodę GetMessages()

public class DragonSubscription : ObjectGraphType
{
    public DragonSubscription(OpinionMessageService messageService)
    {
        Name = "Subscription";
        AddField(new EventStreamFieldType
        {
            Name = "opinionAdded",
            Type = typeof(OpinionAddedMessageType),

            Resolver = new FuncFieldResolver<OpinionAddedMessage>
            (c => c.Source as OpinionAddedMessage),

            Subscriber = new EventStreamResolver<OpinionAddedMessage>
            (c => messageService.GetMessages())
        });
    }
}

Dodajemy nasz trzeci kanał w definicji Schema dla smoków.

public class DragonSchema : Schema
{
    private readonly DragonShopDbContext _dbContext;
    private readonly OpinionMessageService _messageService;

    public DragonSchema(DragonShopDbContext dbContext,
        OpinionMessageService messageService,
        IServiceProvider sp) : base(sp)
    {
        _dbContext = dbContext;
        _messageService = messageService;

        Subscription = new DragonSubscription(_messageService);

        Query = new DragonQuery
           (new DragonRepository(_dbContext));

        Mutation = new DragonMutation(
            new DragonExpertOpinionRepository(_dbContext));
    }
}

Robimy tutaj wstrzykiwanie zależności z "OpinionMessageService". Aby to działało musimy dodać konfigurację.

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();
    services.AddSingleton<OpinionMessageService>();

Będziemy korzystać z WebSocketów więc potrzebujemy paczki NuGet "GraphQL.Server.Transports.WebSockets"

GraphQL.Server.Transports.WebSockets paczka NuGet

Dla instalatora GraphQL dodajemy metodę "AddWebSockets()".

services.AddGraphQL((o, p) =>
 {
     var logger = p.GetRequiredService<ILogger<Startup>>();
     o.UnhandledExceptionDelegate = ctx =>
         logger.LogError("{Error} occurred", ctx.OriginalException.Message);

})
.AddWebSockets()
.AddGraphTypes(ServiceLifetime.Scoped)
.AddUserContextBuilder(httpContext => new GraphQLUserContext { User = httpContext.User })
.AddDataLoader()
.AddNewtonsoftJson();

Oczywiście chcielibyśmy dodać zdarzenie/wiadomość dodania opinii w trakcie jej dodawania, a to znaczy, że musimy zmodyfikować DragonMutation.

public class DragonMutation : ObjectGraphType
{
    public DragonMutation(DragonExpertOpinionRepository dragonExpertOpinionRepository,
        OpinionMessageService  opinionMessageService)
    {
        Field<DragonOpinionType>(
            "createOpinion",
            arguments: new QueryArguments(
                new QueryArgument<NonNullGraphType<DragonExpertOpinionInputType>> { Name = "opinion" }
                )
              ,
              resolve: context =>
             {
                 var op = context.GetArgument<DragonExpertOpinion>("opinion");

                 var r = dragonExpertOpinionRepository.AddExpertOpinionAsync(op);
                 opinionMessageService.AddOpinionAddedMessage(op);

                 return r;

             });
    }
}

Do DragonMutation doszła zależności w konstruktorze, a to znaczy, że muszę zmodyfikować kod DragonSchema.

public class DragonSchema : Schema
{
    private readonly DragonShopDbContext _dbContext;
    private readonly OpinionMessageService _messageService;

    public DragonSchema(DragonShopDbContext dbContext,
        OpinionMessageService messageService,
        IServiceProvider sp) : base(sp)
    {
        _dbContext = dbContext;
        _messageService = messageService;

        Subscription = new DragonSubscription(_messageService);

        Query = new DragonQuery
           (new DragonRepository(_dbContext));

        Mutation = new DragonMutation(
            new DragonExpertOpinionRepository(_dbContext), 
            _messageService);
    }
}

DocumentExecuter i problem NOT_SUPPORTED

Uruchamiamy naszą aplikację. W oknie GraphQL Playground wpisujemy polecenie dla kanały subskrypcji i...

subscription {
  opinionAdded{
    dragonId,
    title
  }
}

Dostajemy wyjątek "NOT_SUPPORTED". Dlaczego? Może Cię ciekawić, ale inne tutoriale, które czytałem na temat GraphQL nie mają tego problemu. Zobaczmy w czym jest problem w konsoli aplikacji.

problem NOT_SUPPORTED

Klasa DocumentExecuter nie wspiera subskrypcji od pewnej aktualizacji paczki NuGet. Sugeruja aby użyć klasy SubsritpionDocumentExecuter z paczki NuGet "GraphQL.SystemReactive".

DocumentExecuter does not support executing subscriptions. You can use SubscriptionDocumentExecuter from GraphQL.SystemReactive package to handle subscriptions. occurred

Nie powiem, że dla ta zmiana wydaje się idiotyzmem. Ściągnąłem kod paczki GraphQL i możesz zobaczyć, że rzeczywiście ta paczka chamsko wyrzuca wyjątek, gdy chcesz korzystać z kanału Subskrypcji. 

/// <summary>
/// Returns an instance of an <see cref="IExecutionStrategy"/> given specified execution parameters.
/// <br/><br/>
/// Typically the strategy is selected based on the type of operation.
/// <br/><br/>
/// By default, query operations will return a <see cref="ParallelExecutionStrategy"/> while mutation operations return a
/// <see cref="SerialExecutionStrategy"/>. Subscription operations return a special strategy defined in some separate project,
/// for example it can be SubscriptionExecutionStrategy from GraphQL.SystemReactive.
/// </summary>
protected virtual IExecutionStrategy SelectExecutionStrategy(ExecutionContext context)
{
    // TODO: Should we use cached instances of the default execution strategies?
    return context.Operation.OperationType switch
    {
        OperationType.Query => ParallelExecutionStrategy.Instance,
        OperationType.Mutation => SerialExecutionStrategy.Instance,
        OperationType.Subscription => throw new NotSupportedException($"DocumentExecuter does not support executing subscriptions. You can use SubscriptionDocumentExecuter from GraphQL.SystemReactive package to handle subscriptions."),
        _ => throw new InvalidOperationException($"Unexpected OperationType {context.Operation.OperationType}")
    };
}

Możesz dodać paczkę "GraphQL.SystemReactive", ale to magicznie nie rozwiąże problemu.

GraphQL.SystemReactive paczka NuGet

Ta paczka rzeczywiście zawiera klasę "SubsritpionDocumentExecuter", tylko jak widzisz w kodzie wspiera ona tylko kanał subskrypcji.

Myślałem, że tylko tak mi się wydaje, ale sprawdziłem.

public class SubscriptionDocumentExecuter : DocumentExecuter
{
    public SubscriptionDocumentExecuter()
    {
    }

    public SubscriptionDocumentExecuter(IDocumentBuilder documentBuilder, IDocumentValidator documentValidator, IComplexityAnalyzer complexityAnalyzer)
        : base(documentBuilder, documentValidator, complexityAnalyzer)
    {
    }

    protected override IExecutionStrategy SelectExecutionStrategy(ExecutionContext context)
    {
        return context.Operation.OperationType switch
        {
            OperationType.Subscription => SubscriptionExecutionStrategy.Instance,
            _ => base.SelectExecutionStrategy(context)
        };
    }
}

Na tym etapie myślałem, że muszę napisać swój własny Middleware albo Kontroler, który będzie przełączał się pomiędzy DocumentExecuter, a SubscriptionDocumentExecuter w zależności od definicji polecenia, ale taki mechanizm już w swoim opisie wydaje się bardzo lewy.

To trochę za dużo roboty, aby dodać 3 kanał do GraphQL API. Potem mnie olśniło, że zapewne ten mechanizm DocumentExecuter i SubscriptionDocumentExecuter implementuje interfejs "IDocumentExecuter".

Wystarczy, że wstrzyknę któryś z nich. Problem jednak polega na tym, że DocumentExecuter  wspiera Query i Mutation, a SubscriptionDocumentExecuter wspiera tylko subskrypcje.

Pomysł z własnym middleware, który by przełączał te klasy jest bardzo chybiony. Możesz zobaczyć moją niekompletną implementację tego pomysłu.

public class GraphQLMiddleware
{
    private readonly RequestDelegate _next;
    private readonly GraphQLSettings _settings;
    private readonly IDocumentExecuter _executer;
    private readonly IDocumentExecuterSub _documentExecuterSub;
    private readonly IDocumentWriter _writer;

    public GraphQLMiddleware(
        RequestDelegate next,
        IOptions<GraphQLSettings> options,
        IDocumentExecuter executer,
        IDocumentWriter writer, IDocumentExecuterSub documentExecuterSub)
    {
        _next = next;
        _settings = options.Value;
        _executer = executer;
        _writer = writer;
        _documentExecuterSub = documentExecuterSub;
    }

    [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE1006:Naming Styles", Justification = "ASP.NET Core convention")]
    public async Task Invoke(HttpContext context, ISchema schema)
    {
        if (!IsGraphQLRequest(context))
        {
            await _next(context);
            return;
        }

        await ExecuteAsync(context, schema);
    }

    private bool IsGraphQLRequest(HttpContext context)
    {
        return context.Request.Path.StartsWithSegments(_settings.GraphQLPath)
            && string.Equals(context.Request.Method, "POST", StringComparison.OrdinalIgnoreCase);
    }

    private async Task ExecuteAsync(HttpContext context, ISchema schema)
    {
        var start = DateTime.UtcNow;

        var request = await context.Request.Body.FromJsonAsync<GraphQLRequest>(context.RequestAborted);

        ExecutionOptions options = new ExecutionOptions();
        options.Schema = schema;
        options.Query = request.Query;
        options.OperationName = request.OperationName;
        options.Inputs = request.Variables;
        options.UserContext = _settings.BuildUserContext?.Invoke(context);
        options.EnableMetrics = _settings.EnableMetrics;
        options.RequestServices = context.RequestServices;
        options.CancellationToken = context.RequestAborted;

        ExecutionResult result = null;
        //TUTAJ PRZEŁĄCZAJ
        result = await _executer.ExecuteAsync(options);


        if (_settings.EnableMetrics)
        {
            result.EnrichWithApolloTracing(start);
        }

        await WriteResponseAsync(context, result, context.RequestAborted);
    }

    private async Task WriteResponseAsync(HttpContext context, ExecutionResult result, CancellationToken cancellationToken)
    {
        context.Response.ContentType = "application/json";
        context.Response.StatusCode = 200; // OK

        await _writer.WriteAsync(context.Response.Body, result, cancellationToken);
    }

Patrząc na kod "SubscriptionDocumentExecuter" i "DocumentExecuter" i na mechanizm wewnątrz GraphQL zdałem sobie sprawę, że przecież mogę napisać swój DocumentExecuter, który wspiera wszystkie 3 kanały.

Własny DocumentExecuter 

Oto mój własny DocumentExecuter, który robi to samo tylko łącznie co klasy "SubscriptionDocumentExecuter" i "DocumentExecuter".

public class MyDocumentExecuter : DocumentExecuter, IDocumentExecuter
{
    protected override IExecutionStrategy SelectExecutionStrategy(ExecutionContext context)
    {
        return context.Operation.OperationType switch
        {
            OperationType.Query => ParallelExecutionStrategy.Instance,
            OperationType.Mutation => SerialExecutionStrategy.Instance,
            OperationType.Subscription => SubscriptionExecutionStrategy.Instance,
            _ => base.SelectExecutionStrategy(context)
        };
    }
}

Mam więc wsparcie dla 3 kanału GraphQL API pomimo złośliwości twórców. Teraz pozostaje nam powiedzieć GraphQL aby on wewnątrz siebie korzystał z mojej implementacji.

services.AddSingleton<IDocumentExecuter, MyDocumentExecuter>();

Uruchamiamy aplikację i komunikat "Listening ..." informuje nas, że nasłuchujemy naszego strumienia. Na razie nic w nim się nie dzieje, ponieważ nie mamy nowych opinii, które mogły być, dodawać 

Własny DocumentExecuter działa nasłucuhuje GraphQL Playground

Możemy to przedstawić mają dwie zakładki aplikacji PlayGround GraphQL.

dwie zakładki aplikacji PlayGround GraphQL.

W drugim oknie piszemy polecenie dodawania nowej opinii. Stworzyliśmy go w poprzednim wpisie w tej serii.

Oto treść zapytania:

mutation($opinion: opinionInput!) {
  createOpinion(opinion: $opinion){
    id,title
  }
}

A to JSON zmiennych:

{
  "opinion": {
    "title": "Wonderfull",
    "review": "yeah",
    "dragonId": 2
  }
}

GraphQL Mutation

Możemy zobaczyć, że nasz kanał subskrypcji wysłał na wiadomość, że nowa opinia została dodana.

GraphQL Playground Subscription

Aplikacja konsolowa, która nasłuchuje strumień

Mamy stronę internetową, która korzysta z GraphQL API. Dziś stworzę aplikację, która nasłuchuje strumień.

Tym razem skorzystamy z paczki NuGet GraphQL.Client i GraphQL.Client.Serializer.Newstonsoft na całego.

 paczki NuGet GraphQL.Client i GraphQL.Client.Serializer.Newstonsoft

Czysty kod z naszym klientem HTTPClient byłby bardziej złożony niż zazwyczaj. Potrzebujemy modeli, które powstaną z deserializowanego JSON-a

{
  "data": {
    "opinionAdded": {
      "dragonId": 2,
      "title": "Wonderfull"
    }
  },
  "extensions": {}
}

Oto nasz projekt:

Projekt konsolowy który będzie nasłuchiwał

Oto nasze modele:

public class OpinionAddedMessageModel
{
    public int DragonId { get; set; }
    public string Title { get; set; }
}

public class OpinionAddedMessageModelContainer
{
    public OpinionAddedMessageModel OpinionAdded { get; set; }
}

Kod klienta, który będzie odpytywał wygląda tak :

public class DragonGraphSubsClientFromNuget
{
    private readonly IGraphQLClient _client;


    public DragonGraphSubsClientFromNuget(IGraphQLClient client)
    {
        _client = client;
    }

    public void SubscribeToUpdates()
    {
        var userJoinedRequest = new GraphQL.GraphQLRequest
        {
            Query = @"
            subscription {
                  opinionAdded{
                    dragonId,
                    title
                  }
                }"
        };

        IObservable<GraphQLResponse<OpinionAddedMessageModel>> subscriptionStream
            = _client.CreateSubscriptionStream<OpinionAddedMessageModel>(userJoinedRequest);

        var subscription = subscriptionStream.Subscribe(response =>
        {
            Console.WriteLine($"New opinion Added '{response.Data.DragonId}' ");
        });

    }
}

Łączymy się ze strumieniem, gdy nowa opinia zostanie dodana wtedy wyświetlimy w konsoli ID smoka, do którego została dodana opinia.

Oto kod konsoli :

class Program
{
    static void Main(string[] args)
    {
        HttpClient httpclient = new HttpClient();
        httpclient.BaseAddress = new Uri("https://localhost:5001/graphql");
        httpclient.Timeout = new TimeSpan(0, 0, 30);
        httpclient.DefaultRequestHeaders.Clear();

        var opt = new GraphQLHttpClientOptions();
        opt.EndPoint = new Uri("https://localhost:5001/graphql");
        var c = new GraphQLHttpClient(opt,
                            new NewtonsoftJsonSerializer(),
                            httpclient);

        DragonGraphSubsClientFromNuget client =
            new DragonGraphSubsClientFromNuget(c);

        client.SubscribeToUpdates();

        Console.WriteLine("Wait");
        Console.ReadLine();
    }
}

Zobaczmy czy ten klient konsolowy słucha strumienia.

Debugowanie kodu w Visual Studio

Jak widać do konsoli dodała się nowa linijka, gdy dodałem nową opinię, czyli nasłuchiwanie strumienia to działa.

Aplikacja działa. Mamy strumień GraphQL

Udało się. Na tym na razie kończę ten cykl pisania serwera i klienta GraphQL. Nasz klient obsługuje zapytania, modyfikację oraz subskrypcje. 

O GraphQL można jeszcze opowiadać, ale jeśli to zrobię to zapewne nie będzie to kolejny wpis do tego cyklu.

Ja teraz mogę trzymać kciuki, że te wpisy będą aktualne. Pisząc te wpisy zauważyłem, że większość kodu, poradników do GraphQL z ubiegłego roku pracujących na .NET CORE 3.2 jest już bardzo nieaktualnych. Mam nadzieje, że biblioteki nie będą już wprowadzała szalonych zmian jak, chociażby ta z niedziałająca domyślnie subskrypcją.

Do zobaczenia :)