IAsyncLINQ i IEnumerable<T>. LINQ od .NET 3.5 daje nam wiele możliwości do modyfikowania, filtrowania sekwencji danych. 

Co, jednak jeśli twój kod pracuje asynchronicznie. Jak wtedy ustawić te rury LINQ, aby prze-mapować dane lub je przefiltrować. 

Z .NET 6 pojawiła się nowy interfejs "IAsyncEnumerable<T>", które zostały stworzone z myślą o tak zwanych asynchronicznych strumieniach". 

W esencji "IAsyncEnumerable<T>” działa tak samo, jak  IEnumerable<T> tylko każde elementy takiej strumieniowanej kolekcji wychodzą asynchronicznie. Co nie blokuje programu, gdy wyciągniecie każdego elementu ze strumienia może nawet trwać na przykład ponad sekundę.

Postanowiłem odświeżyć wątek asynchronicznego C# ponieważ wierz mi mamy jeszcze wiele tematów do omówienia.  Zobaczmy co potrafi IAsyncEnumerable<T>.

Dlaczego IAsyncEnumerable, a nie Task<IEnumerable<T>>

Wróćmy do podstaw IEnumerable<T>. Gdy jakaś metoda zwraca IEnumerable<T> to znaczy, że dana sekwencja elementów nie została jeszcze wygenerowana. 

Pętla foreach wyciągnie pojedyncze element z tego interfejsu, a operacja "Count()" wymusi przetworzenia całej kolekcji, gdyż inaczej nie można poznać całkowitej liczby elementów w takiej leniwej kolekcji.

Dodatkowo IEnumerable<T> daje możliwość tworzenia swoich kolekcji, które są wirtualne albo i nie.

Tutaj przykładowo opakowuje tylko listę elementów.

class MyObjects : IEnumerable<MyObject>
{
    List<MyObject> mylist = new List<MyObject>();

    public MyObject this[int index]  
    {  
        get { return mylist[index]; }  
        set { mylist.Insert(index, value); }  
    } 

    public IEnumerator<MyObject> GetEnumerator()
    {
        return mylist.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }
}

Przy użyciu słowa kluczowego yield można tworzyć coś, co ja nazywam wirtualnymi kolekcjami.

Kod pochodzi z mojego starego wpisu.

static public IEnumerable<string> Napisy()
{
    yield return "Ala";
    yield return "ma";
    yield return "radioaktywnego";
    yield return "kota";
    yield return "z";
    yield return "Marsa";
}

static void Main(string[] args)
{
    int i = 0;
    foreach (var item in Napisy())
    {
        i++;
        Console.WriteLine(item);
    }
}

A tutaj przy pomocy słowa kluczowego "yield" tworzę wirtualną kolekcję, która jest nieskończona.

public static class YieldCollections
{
    public static IEnumerable<int> RandomNumberCollection()
    {
    
        while(true)
        {
            Random r = new Random(Guid.NewGuid()
                .GetHashCode());
    
            int number = r.Next();
    
            yield return number;
        }
    }
}

Ten kod pochodzi z tego wpisu.

Wracajmy jednak do naszego głównego tematu. Teraz gdy .NET 6 oferuje nam IAsyncEnumerable rodzi się pytanie, kiedy z niego skorzystać.

Może IAsyncEnumerable powinno zastąpić Task<List<T>> albo Task<IEnumerable<T>>?

A jeśli tak to dlaczego?

Moim zdaniem korzystanie z Task, gdy wykonujesz jakąś operację asynchroniczną per iteracje dla danej kolekcji jest w porządku.

Task też super się spełnia, gdy wiemy, że chcemy zwrócić jakąś kolekcję z góry która powstała w wyniku operacji asynchronicznej. 

Oto przykład takie scenariusza. Pobieramy listę tekstów z danego REST API, który zwróci tę listę tekstów w formacie JSON. Oczywiście ta operacja jest asynchroniczna.

async Task<List<string>> GetTexts()
{
    using var client = new HttpClient();

    var texts = await client.
        GetFromJsonAsync<List<string>>
        ("/api/texts");

    return texts;
}

Oto uruchomienie tej metody

await GetTexts();

IEnumerable<string> wydaje się bezsensowny. Iteracja asynchroniczna oczywiście nie zajdzie. Sam Task powinien nam zwrócić kompletną kolekcję i opakowanie tego w IEnumerable jest tylko iluzją 

async Task<IEnumerable<string>> GetTexts2()
{
    using var client = new HttpClient();
    var ids = new int[] { 11, 22, 33 };
    foreach (var id in ids)
    {
        var text = await client.
        GetFromJsonAsync<string>
            ($"/api/customers?id={id}");

        yield return text;
    }
}

Na szczęście zachowanie leniwego pobierania danych asynchronicznie można osiągnąć w takim sposób.

async IAsyncEnumerable<string> GetTexts3()
{
    using var client = new HttpClient();

    var ids = new int[] { 11, 22, 33, 44,
        55, 66, 77 ,88, 99 };

    foreach (var id in ids)
    {
        var text = await client.
            GetFromJsonAsync<string>
            ($"/api/texts?id={id}");

        yield return text;
    }
}

Jak każdą technikę asynchroniczną tak i IAsyncEnumerable można zepsuć. Ten przykład poniżej jest jeszcze okej. Co, jeśli jednak "nazwa gry" nie przekazywana, a jest wynikiem działania jeszcze innej metody asynchronicznej. 

async IAsyncEnumerable<string> GetTexts4(string game)
{
    using var client = new HttpClient();

    var ids = new int[] { 11, 22, 33 };

    foreach (var id in ids)
    {
        var text = await client.
            GetFromJsonAsync<string>
            ($"/api/texts?id={id}&game={game}");

        yield return text;
    }
}

Oto przykład takiej metody asynchronicznej.

async Task<string> GetGame()
{
    await Task.Delay(4000);
    return "Mortal Kombat 2";
}

Jak to można zrobić źle? Opakowanie IAsyncEnumerable w Task psuje cały mechanizm strumieniowania.

//BAD IDEA
async Task<IAsyncEnumerable<string>> GetTextsFromGame()
{
    var game = await GetGame();
    return GetTexts4(game);
}

Lepiej jest to zrobić tak. Pobrać asynchronicznie wszystkie potrzebne parametry, a potem zrobić await foreach.

//GOOD IDEA
async IAsyncEnumerable<string> GetTexts5()
{
    var game = await GetGame();
    await foreach (var c in GetTexts4(game))
    {
        yield return c;
    }
}

W ten sposób możemy skorzystać z "IAsyncEnumerable"

IAsyncEnumerable, a LINQ

Używanie LINQ na leniwych kolekcjach strumieniowych jest miejscami dziwne. Zobaczmy, o co chodzi. Oto przykład metody, która zwróci mi listę folderów.

async IAsyncEnumerable<string> GetFolders()
{
    List<string> folders = new List<string>()
    {
        @"D:\__zdjecia",
        @"D:\_smieciedoanalizy",
        @"D:\----------------",
        @"D:\00",
        @"D:\0Prezentacja",
        @"D:\Camera",
    };

    foreach (var item in folders)
    {
        await Task.Delay(4000);
        yield return item;
    }
}

Oto inna metoda, która zwróci mi listę informacji o plikach w danym folderze.

async IAsyncEnumerable<FileInfo> GetFilesInfoAsync(string folder)
{
    var directory = new DirectoryInfo(folder);

    foreach (var item in directory.
           GetFiles("*", SearchOption.TopDirectoryOnly))
    {
        await Task.Delay(100);
        yield return item;
    }

}

Oto podobna metoda do poprzedniej ta tylko zwraca Taska, a nie IAsyncEnumerable.

async Task<FileInfo[]> GetFilesInfoTaskAsync(string folder)
{
    await Task.Delay(100);
    var directory = new DirectoryInfo(folder);

    return directory.
           GetFiles("*", SearchOption.TopDirectoryOnly);
}

Stworzyłem też proste metody, które sprawdzają, czy dany plik jest podejrzany. Jedna metoda jest asynchroniczna, a druga nie 

async Task<bool> IsSusAsync(FileInfo file)
{
    await Task.Delay(2000);
    Random r = new Random();

    return r.Next(1, 10) > 8;
}

bool IsSus(FileInfo file)
{
    Random r = new Random();

    return r.Next(1, 10) > 8;
}

Całą kolekcję IAsyncEnumerable można w każdej chwili zamienić na listę korzystając z ToListAsync().

List<string> foldersExampleLinqAsync = await GetFolders().ToListAsync();

Alternatywnie można zawsze stworzyć swoją listę używając "await foreach".

List<string> folders = new List<string>();
await foreach (var folder in GetFolders())
{
    folders.Add(folder);
}

To jak to działa z LINQ. Mamy Select z Select, które wyciągają dane ze dwóch strumieni. 

var collection1_1 = GetFolders()
                    .Select(f => GetFilesInfoAsync(f))
                    .Select(f => f.Where(file => IsSus(file)));

Taki kod jest poprawny i LINQ takie, jakie jest da sobie radę. W końcu select z select robi dwie pętle await foreach pod stołem.

await foreach(var fileinfos in collection1_1)
{
    await foreach(var item in fileinfos)
    {
        Console.WriteLine(item.Name);
        Console.Write($" -> {item.Length}\n");
    }

}

Taki kod jest nadal w porządku.

var collection1_2 = folders
                    .Select(f => GetFilesInfoAsync(f))
                    .Select
                    (files => files.Select
                    (file =>
                        new
                        {
                            IsSus = IsSus(file),
                            Name = file.Name,
                            FullName = file.FullName,
                            Lenght = file.Length,
                        }
                    ));

Czy tu będą problemy? W końcu wewnątrz SELECT-a robimy tym razem operację sprawdzania pliku asynchronicznie. 

var collection2_2 = folders
                    .Select(f => GetFilesInfoAsync(f))
                    .Select
                    (files => files.Select
                    (async file =>
                        new
                        {
                            IsSus = await IsSusAsync(file),
                            Name = file.Name,
                            FullName = file.FullName,
                            Lenght = file.Length,
                        }
                    ));

Taki kod spokojnie się skompiluje i zadziała. 

Problemy zaczynają się, gdy próbujemy tę asynchroniczną metodę wykonać wewnątrz wyrażenia Where.

var collection2_1 = folders
                    .Select(f => GetFilesInfoAsync(f))
                    .Select(f =>
                    f.Where(async file => await IsSusAsync(file)));

Co wtedy nam pozostaje? Zamienić metodę asynchroniczną na synchroniczną blokującą poprzez właściwość Result. W akcje desperacji możemy się podjąć takiej akcji.

var collection2_1_1 = folders
                    .Select(f => GetFilesInfoAsync(f))
                    .Select(f => f.Where(file => IsSusAsync(file).Result));

var collection2_2_1 = folders
                    .Select(f => new DirectoryInfo(f).
                    GetFiles("*", SearchOption.TopDirectoryOnly))
                    .Select
                    (files => files.Select
                    (file => 
                        new 
                        {
                            IsSus = IsSusAsync(file).Result,
                            Name = file.Name,
                            FullName = file.FullName,
                            Lenght = file.Length,
                        }
                    ));

A jak to możemy zrobić lepiej? Wypadałoby napisać swoją metodę WhereAwait(), która by nie miała problemów z operacjami "async i await" wewnątrz wyrażenia lambda

Tylko po co coś takiego pisać, gdy zapewne jest na to paczka NuGet.

Paczka Nuget System.LINQ.Async

Oto przykład działania metody WhereAwait(). Dzięki niej cała ta operacja nadal jest asynchronicznym strumieniem.

IAsyncEnumerable<string> folders = GetFolders();

var collection3_1 = folders
                    .Select( f => GetFilesInfoAsync(f))
                    .Select(f => f.WhereAwait
                    (async file => await IsSusAsync(file)));

await foreach (var fileinfos in collection3_1)
{
    await foreach (var item in fileinfos)
    {
        Console.WriteLine(item.Name);
        Console.Write($" -> {item.Length}\n");
    }
}

Paczka ta także oferuję metodę SelectAwait. Ona na tym etapie skrystalizuje nas asynchroniczny strumień.

var collection3_2 = folders
                    .SelectAwait(async f => await GetFilesInfoAsync2(f))
                    .Select(f => f.WhereAwait
                    (async file => await IsSusAsync(file)));

Ta sama paczka NuGet ma metodę "ToAsyncEnumerable()" , która potrafi zamieniać normalne listy na kolekcję IAsyncEnumerable. 

Kiedy taka konwersja ma sens, o tym w następnym przykładzie.

var collection3_3 = folders
                    .SelectAwait(async f => await GetFilesInfoAsync2(f))
                    .Select(f => f.ToAsyncEnumerable().WhereAwait
                    (async file => await IsSusAsync(file)));

Asynchronicznie sekwencyjnie, asynchronicznie współbieżnie

Do następnego przykładu stworzyłem sobie listę stron internetowych. 

var urls = new List<string>()
{
    "https://cezarywalenciuk.pl/",
    "https://docs.microsoft.com/pl-pl/aspnet/core/blazor/",
    "https://angular.io/",
    "https://pl.reactjs.org/",
    "https://vuejs.org/",
    "https://svelte.dev/",
    "https://www.javascript.com/",
    "https://www.youtube.com/",
    "https://azure.microsoft.com/",
    "https://www.amazon.pl/",
    "https://zetcode.com/csharp/httpclient/",
    "https://stackoverflow.com/questions/55686928/using-stopwatch-in-c-sharp",
    "https://www.programmingwithwolfgang.com/replace-rabbitmq-azure-service-bus-queue/",
    "https://medium.com/dotnet-hub/use-azure-key-vault-with-net-or-asp-net-core-applications-read-azure-key-vault-secret-in-dotnet-fca293e9fbb3",
    "https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/attribute-mapping.html",
    "https://www.nuget.org/packages/System.Linq.Async",
    "https://github.com/dotnet/reactive",
    "https://www.udemy.com/",
    "https://jquery.com/",
    "https://www.php.net/",
    "https://www.python.org/",
    "https://go.dev/",
    "https://docs.microsoft.com/pl-pl/dotnet/csharp/"

};

Moim zadaniem jest pobrać zawartość każdej strony internetowej z tej listy po to, aby mógł sprawdzić potem, czy te strony internetowe zawierają słowo podane przez użytkownika.

Oto rozwiązania numer 1 przy użyciu IAsyncEnumerable. Jak widzisz w tym przypadku użycie "ToAsyncEnumerable()" ma sens. Przy pomocy tej metody z Linq.Async zamieniam listę stron na asynchroniczny strumień, którego każdy elementy będzie wykonywał metodę GetStringAsync z klasy HttpClient.

async Task SolutionOne()
{
    using var client = new HttpClient();
    Console.WriteLine();
    Console.WriteLine("Wpisz szukane słowo");
    var word = Console.ReadLine();
    if (string.IsNullOrEmpty(word))
        return;

    var timer = new Stopwatch(); timer.Start();

    var results = urls.ToAsyncEnumerable()
            .SelectAwait(async url =>
                new {
                    Url = url,
                    Html = await client.GetStringAsync(url)
                })
            .Where(x => x.Html.Contains(word));


    await foreach (var result in results)
    {
        Console.ForegroundColor = ConsoleColor.Cyan;
        Console.WriteLine($"Znalezione {result.Url}");
        Console.ResetColor();
    }

    timer.Stop();
    Console.WriteLine(timer.ElapsedMilliseconds);
}

Jaki jest problem z tym rozwiązaniem. Specjalnie napisałem taki przykłady, aby pokazać, że IAsyncEnumerable nie w każdym scenariuszu jest użyteczne.

Strumień IAsyncEnumerable jest sekwencyjni. Znaczy to, że każda operacja asynchroniczna zapisana w taki sposób czeka na poprzednią.

Być może gdybyśmy szukali pierwszej lepszej strony internetowej gdzie pada dane słowo wtedy takie rozwiązanie miało sens. Przerywalibyśmy wtedy sekwencje.

My jednak wiemy, że pobierzemy zawsze wszystkie strony internetowe. Jeśli zależy nam na prędkości rozwiązania to oczywiście te strony chcemy pobrać współbieżnie, czyli równocześnie.

Oto przykład rozwiązania tego samego zadania, ale przy użyciu Task.WhenAll 

//Przetwarzanie sekwencji równolegle
async Task SolutionTwo()
{
    using var client = new HttpClient();
    Console.WriteLine();
    Console.WriteLine("Wpisz szukane słowo");
    var word = Console.ReadLine();
    if (string.IsNullOrEmpty(word))
    return;

    var tasks = urls
        .Select(async url => new
        {
            Url = url,
            Html = await client.GetStringAsync(url)
        });

    var timer = new Stopwatch(); timer.Start();

    var results2 = await Task.WhenAll(tasks);

    foreach (var result in results2.Where(x => 
        x.Html.Contains(word)))
    {
        Console.ForegroundColor = ConsoleColor.Cyan;
        Console.WriteLine($"Znalezione {result.Url}");
        Console.ResetColor();
    }

    timer.Stop();
    Console.WriteLine(timer.ElapsedMilliseconds);

}

Alternatywnie możemy skorzystać z ForEachAsync.

//Równolegle
async Task SolutionThree()
{
    using var client = new HttpClient();
    Console.WriteLine();
    Console.WriteLine("Wpisz szukane słowo");
    var word = Console.ReadLine();
    if (string.IsNullOrEmpty(word))
        return;

    var parallelOptions = new ParallelOptions() 
    { MaxDegreeOfParallelism = 4 };

    var timer = new Stopwatch(); timer.Start();

    await Parallel.ForEachAsync(urls, parallelOptions,
                async (url, ct) 
                => await FindMatch(url, word, client));

    timer.Stop();
    Console.WriteLine(timer.ElapsedMilliseconds);
}

async Task FindMatch(string url, string searchTerm, HttpClient client)
{
    var html = await client.GetStringAsync(url);
    if (html.Contains(searchTerm))
    {
        Console.ForegroundColor = ConsoleColor.Cyan;
        Console.WriteLine($"Znalezione {url}");
        Console.ResetColor();
    }
}

Co jest najszybsze? Na pewno nie "IAsyncEnumerable" po jest ono sekwencyjnie. 

Zostają więc dwaj kandydaci Task.WhenAll i Parallel.ForEachAsync. 

while (true)
{
    var options = Console.ReadKey();

    if (options.KeyChar == '1')
        await SolutionOne();
    else if (options.KeyChar == '2')
        await SolutionTwo();
    else if (options.KeyChar == '3')
        await SolutionThree();
}

W moich testach wychodzi, że to Task.WhenAll jest szybsze. Jednakże warto zaznaczyć, że operacja pobierania strony nie jest operacją deterministyczną i z różnych losowych przyczyn może ona raz zadziałać szybciej lub wolniej.

To wszystko, co musisz wiedzieć na temat IAsyncEnumerable.