RX for NET

Pomimo napiętego grafika zajęć postanowiłem przyjrzeć się najnowszej zabawce w programowaniu asynchronicznym, która się nazywa “Reactive Extensions” w skrócie RX.

 

 

Co to jest Reactive Extensions.

Programowanie asynchroniczne jest wszędzie obecne, a wraz z rozwojem sieci oraz chmury będzie dla nas codziennością. Asynchroniczne programowanie nie ogranicza się tylko do aplikacji web w aplikacjach desktopowych musimy zmierzyć się z operacjami I/O oraz innymi operacjami w tle.

Programowanie asynchroniczne jest trudne , ponieważ nic nie dzieje się w określonej kolejności . Musimy ciągle przechwytywać postępowanie naszego asynchronicznego procesu. Jest to uciążliwe i w kodzie niekoniecznie wygląda to ładnie.

Czasami też musimy się martwić wielowątkowością i wątkiem UI.

Ale no cóż, czas aby na ratunek pojawiła się biblioteka RX.

Oficjalna definicja RX  to brzmi jakoś tak: library for composing asynchronous and event-based programs using observable collections.

Brzmi dość skomplikowanie , więc dlaczego tego nie uprościć . Rx może być nazywane jako Linq do wydarzeń. Dlaczego ?

W LINQ posiadamy interface “IEnumerable<T>” dzięki temu interface mamy do dyspozycji metody rozszerzeniowe LINQ oraz wyciągnięcia pojedynczych obiektów z kolekcji. Wykonuje się to poprzez metodę “GetEnumerator”. Dzięki temu możemy wykonać na kolekcji pętle foreach.

foreach (var item in collection)
{
     //ile razy można :)
}

Ile razy to widzieliśmy w swojej aplikacji. Biorąc pod uwagę , że prawie wszystkie kolekcje w .NET są obsłużone przez interface . W pętli wyciągamy dane z kolekcji za pomocą  interface IEnumerator,  na którym w danym momencie wykonuje się metoda “move next”.

List<string> f =  new List<string>();
IEnumerator<string> enumerator = f.GetEnumerator();
while (enumerator.MoveNext())
{
    var cos = enumerator.Current;
}

Tak wyciągamy dane za pomocą interface-ów (IEnumerable\IEnumerator) , zgodnie ze wzorcem Iterator .

Enumerables IEnumerable i IEnumerator 2

Ilustracja obrazująca IEnumerable i  IEnumerator


Reactive Extension działa dokładnie odwrotnie, czyli dane są wkładane. Co to znaczy. Jak można wkładać dane. Aby to zrozumieć spójrzmy na jedną z podstawowych sytuacji w programowaniu jak obsłużenie wydarzeń e aplikacji.

public MainPage()
{
    InitializeComponent();
    button.MouseMove += new MouseEventHandler(button_MouseMove);
}

void button_MouseMove(object sender, MouseEventArgs e)
{
    //coś coś tam
}

Tutaj zostało obsłużone zdarzenie “MouseMove”. Kursor myszki zmienia swoją lokalizacje w jakimś okresie i zdarzenie informuje nas o tym; czyli dane są wkładane. 

RX daje nam do dyspozycji interface “IObservable i IObserver Działają one zgodnie ze wzorem projektowym “Obserwator” .

Iobserver Iobservable dispose

 

IObservable<T> reprezentuje klasę, która wysyła zgłoszenia , jak i tą, która śledzi działania powiedzmy prawie jak w stylu MouseMove.

Natomiast interface  IObserver<T> reprezentuje ten interface, który te zgłoszenia otrzymuje , jak i ten, który zgłasza raport na temat zmiany.

“<T>” reprezentuje  formę obiektu tego zgłoszenia & lt;string>,<int>.

IObservable jest takim dostawcą,  listonoszem. Implementuje tylko jedną metodę “Subscribe” , która wskazuje na obserwatora (obiekt), z którego chce otrzymywać informacje o zmianach. Może mieć on zero bądź wielu obserwatorów. Tak jak listonosz może wysłać wiadomości do różnych osób. Listonosz nie czyni żadnych ograniczeń co do liczby obserwatorów, jak i kolejności.

Dostawca może wysyłać 3 różne wiadomości do obserwatora (IObserver) wykonując w nim trzy metody.

  • OnNext – mówi obserwatorowi aby zmienił obecne dane, bądź je odświeżył. Ponieważ otrzymał nowe.
  • OnError - warunek, dostawca mówi obserwatorowi , że coś poszło nie tak.
  • OnCompleted - dostawaca informuje , że skończył wysłania zgłoszeń.


Dostawca ma też do dyspozycji interface “IDisposable” aby anulować niepotrzebną już subskrypcje.

Możemy dzięki tym interface-om , zmienić wydarzenia na obiekty Obserwatora , a ten obiekt może być subskrybowany i możemy do niego stosować metody LINQ.

Najlepiej pokazać trochę kodu, ponieważ sprawa nie jest tutaj tak oczywista jak w poprzednim omówionym przykładzie o IEnu.

Dla lepszego zrozumienia postanowiłem stworzyć własną klasę  Mojobserwator, która dziedziczy po interface IObserver<T>. Kod był wzorowany na przykładzie z prezentacji Dev Days 2011 ,więcej o tym na końcu artykułu.

class MojObserwator<T> :IObserver<T>
{
    public void OnCompleted()
    {
        Console.ForegroundColor = ConsoleColor.Green;
        Console.WriteLine("\t Skończone, Nie otrzymuje więcej wiadomości");
        Console.ResetColor();
    }

    public void OnError(Exception error)
    {
        Console.ForegroundColor = ConsoleColor.Red;
        Console.WriteLine("Coś się zchrzaniło stary: ");
        Console.WriteLine(error.Message);
        Console.ResetColor();
    }

    public void OnNext(T value)
    {
        Console.ForegroundColor = mainColor;
        Console.WriteLine("\t Otrzymałem nową wartość {0}", value);
        Console.ResetColor();
    }

    ConsoleColor mainColor;
    public MojObserwator(ConsoleColor mainColor = ConsoleColor.Yellow)
    {
      this.mainColor = mainColor;
    }
}

Ta klasa będzie otrzymywać komunikaty i ona w sumie określa, co ma zrobić, gdy otrzyma  3 polecenia ze swojego dostawcy,

Teraz jak zachowuje się interface IObservable<T>.

Została tutaj użyta klasa Observabel, która posiada w sobie kilka metod, które potrafią stworzyć sekwencje operacji, które będę obserwował. W tym wypadku będzie to wartość, która będzie zwiększała się o jeden, co dwie sekundy.  Gdy mamy już dostawcę musimy jeszcze tylko go subskrybować i podać mu obserwatora, który będzie otrzymywał wartości zwrotne i nimi operował na swój sposób..

Jak widać nie ma problemu aby dostawca subskrybował kolejną sekwencje pod warunkiem , że zwraca ten sam typ z nowym obserwatorem.

IObservable<long> Dostawca = Observable.Interval(TimeSpan.FromSeconds(2));

IDisposable sub1 =  Dostawca.Subscribe(new MojObserwator<long>());
Console.ReadKey();

//drugi sub 
Dostawca = Observable.Interval(TimeSpan.FromSeconds(3));
IDisposable sub2 = Dostawca.Subscribe(new MojObserwator<long>(ConsoleColor.DarkCyan));
Console.ReadKey();

//wyjątek 
Dostawca = Observable.Throw<long>(new Exception("Specjalnie to zrobiłem"));
Dostawca.Subscribe(new MojObserwator<long>());

Console.ReadKey();
//pozbycie się sub sub1.Dispose();
Console.ReadKey();
sub2.Dispose();
Console.ReadKey();

Mogę też wyrzucić wyjątek.

A na koniec mogę anulować każdą subskrypcję, czyli  już nie śledzić nowych wartości, które przychodzą. Każda  tutaj subskrypcja to interface “IDisposable” , która ma metodę “Dispose” . Po jej wykonaniu subskrypcja jest anulowana.

Działanie kodu w konsoli wygląda tak ,gdzie '’a” to znak by aplikacja przeszła dalej (patrz Console.ReadKey).

Konsolowa aplikacja i jej działanie 2
Co mogą biblioteki RX:

  • Asynchroniczne programowanie. Rx ma wbudowane wsparcie dla kolekcji obserwatorów w wątkach w tle.
  • Obsłużenie zdarzeń w przedziwny sposób. Jak np. dzięki metodzie LINQ “Skip(2)” wydarzenie będzie zachodzić dopiero za trzecim razem.
  • Możesz tworzyć obiekty obserwujące z egzystujących wydarzeń i kolekcji i używać jej potem jak zwykłą klasę. Jest nawet metoda “ToEvent()” .


Istnieją  aż cztery wersje tej biblioteki  w zależności od technologii, do której chcemy ją użyć.

Wersje RX Library
Informacje, które były pomocne dla tego wpisu: