Pomimo napiętego grafika zajęć postanowiłem przyjrzeć się najnowszej zabawce w programowaniu asynchronicznym, która się nazywa “Reactive Extensions” w skrócie RX.
Co to jest Reactive Extensions.
Programowanie asynchroniczne jest wszędzie obecne, a wraz z rozwojem sieci oraz chmury będzie dla nas codziennością. Asynchroniczne programowanie nie ogranicza się tylko do aplikacji web w aplikacjach desktopowych musimy zmierzyć się z operacjami I/O oraz innymi operacjami w tle.
Programowanie asynchroniczne jest trudne , ponieważ nic nie dzieje się w określonej kolejności . Musimy ciągle przechwytywać postępowanie naszego asynchronicznego procesu. Jest to uciążliwe i w kodzie niekoniecznie wygląda to ładnie.
Czasami też musimy się martwić wielowątkowością i wątkiem UI.
Ale no cóż, czas aby na ratunek pojawiła się biblioteka RX.
Oficjalna definicja RX to brzmi jakoś tak: library for composing asynchronous and event-based programs using observable collections.
Brzmi dość skomplikowanie , więc dlaczego tego nie uprościć . Rx może być nazywane jako Linq do wydarzeń. Dlaczego ?
W LINQ posiadamy interface “IEnumerable<T>” dzięki temu interface mamy do dyspozycji metody rozszerzeniowe LINQ oraz wyciągnięcia pojedynczych obiektów z kolekcji. Wykonuje się to poprzez metodę “GetEnumerator”. Dzięki temu możemy wykonać na kolekcji pętle foreach.
foreach (var item in collection)
{
//ile razy można :)
}
Ile razy to widzieliśmy w swojej aplikacji. Biorąc pod uwagę , że prawie wszystkie kolekcje w .NET są obsłużone przez interface . W pętli wyciągamy dane z kolekcji za pomocą interface IEnumerator, na którym w danym momencie wykonuje się metoda “move next”.
List<string> f = new List<string>();
IEnumerator<string> enumerator = f.GetEnumerator();
while (enumerator.MoveNext())
{
var cos = enumerator.Current;
}
Tak wyciągamy dane za pomocą interface-ów (IEnumerable\IEnumerator) , zgodnie ze wzorcem Iterator .
Reactive Extension działa dokładnie odwrotnie, czyli dane są wkładane. Co to znaczy. Jak można wkładać dane. Aby to zrozumieć spójrzmy na jedną z podstawowych sytuacji w programowaniu jak obsłużenie wydarzeń e aplikacji.
public MainPage()
{
InitializeComponent();
button.MouseMove += new MouseEventHandler(button_MouseMove);
}
void button_MouseMove(object sender, MouseEventArgs e)
{
//coś coś tam
}
Tutaj zostało obsłużone zdarzenie “MouseMove”. Kursor myszki zmienia swoją lokalizacje w jakimś okresie i zdarzenie informuje nas o tym; czyli dane są wkładane.
RX daje nam do dyspozycji interface “IObservable i IObserver Działają one zgodnie ze wzorem projektowym “Obserwator” .
IObservable<T> reprezentuje klasę, która wysyła zgłoszenia , jak i tą, która śledzi działania powiedzmy prawie jak w stylu MouseMove.
Natomiast interface IObserver<T> reprezentuje ten interface, który te zgłoszenia otrzymuje , jak i ten, który zgłasza raport na temat zmiany.
“<T>” reprezentuje formę obiektu tego zgłoszenia & lt;string>,<int>.
IObservable jest takim dostawcą, listonoszem. Implementuje tylko jedną metodę “Subscribe” , która wskazuje na obserwatora (obiekt), z którego chce otrzymywać informacje o zmianach. Może mieć on zero bądź wielu obserwatorów. Tak jak listonosz może wysłać wiadomości do różnych osób. Listonosz nie czyni żadnych ograniczeń co do liczby obserwatorów, jak i kolejności.
Dostawca może wysyłać 3 różne wiadomości do obserwatora (IObserver) wykonując w nim trzy metody.
- OnNext – mówi obserwatorowi aby zmienił obecne dane, bądź je odświeżył. Ponieważ otrzymał nowe.
- OnError - warunek, dostawca mówi obserwatorowi , że coś poszło nie tak.
- OnCompleted - dostawaca informuje , że skończył wysłania zgłoszeń.
Dostawca ma też do dyspozycji interface “IDisposable” aby anulować niepotrzebną już subskrypcje.
Możemy dzięki tym interface-om , zmienić wydarzenia na obiekty Obserwatora , a ten obiekt może być subskrybowany i możemy do niego stosować metody LINQ.
Najlepiej pokazać trochę kodu, ponieważ sprawa nie jest tutaj tak oczywista jak w poprzednim omówionym przykładzie o IEnu.
Dla lepszego zrozumienia postanowiłem stworzyć własną klasę Mojobserwator, która dziedziczy po interface IObserver<T>. Kod był wzorowany na przykładzie z prezentacji Dev Days 2011 ,więcej o tym na końcu artykułu.
class MojObserwator<T> :IObserver<T>
{
public void OnCompleted()
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("\t Skończone, Nie otrzymuje więcej wiadomości");
Console.ResetColor();
}
public void OnError(Exception error)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("Coś się zchrzaniło stary: ");
Console.WriteLine(error.Message);
Console.ResetColor();
}
public void OnNext(T value)
{
Console.ForegroundColor = mainColor;
Console.WriteLine("\t Otrzymałem nową wartość {0}", value);
Console.ResetColor();
}
ConsoleColor mainColor;
public MojObserwator(ConsoleColor mainColor = ConsoleColor.Yellow)
{
this.mainColor = mainColor;
}
}
Ta klasa będzie otrzymywać komunikaty i ona w sumie określa, co ma zrobić, gdy otrzyma 3 polecenia ze swojego dostawcy,
Teraz jak zachowuje się interface IObservable<T>.
Została tutaj użyta klasa Observabel, która posiada w sobie kilka metod, które potrafią stworzyć sekwencje operacji, które będę obserwował. W tym wypadku będzie to wartość, która będzie zwiększała się o jeden, co dwie sekundy. Gdy mamy już dostawcę musimy jeszcze tylko go subskrybować i podać mu obserwatora, który będzie otrzymywał wartości zwrotne i nimi operował na swój sposób..
Jak widać nie ma problemu aby dostawca subskrybował kolejną sekwencje pod warunkiem , że zwraca ten sam typ z nowym obserwatorem.
IObservable<long> Dostawca = Observable.Interval(TimeSpan.FromSeconds(2));
IDisposable sub1 = Dostawca.Subscribe(new MojObserwator<long>());
Console.ReadKey();
//drugi sub
Dostawca = Observable.Interval(TimeSpan.FromSeconds(3));
IDisposable sub2 = Dostawca.Subscribe(new MojObserwator<long>(ConsoleColor.DarkCyan));
Console.ReadKey();
//wyjątek
Dostawca = Observable.Throw<long>(new Exception("Specjalnie to zrobiłem"));
Dostawca.Subscribe(new MojObserwator<long>());
Console.ReadKey();
//pozbycie się sub sub1.Dispose();
Console.ReadKey();
sub2.Dispose();
Console.ReadKey();
Mogę też wyrzucić wyjątek.
A na koniec mogę anulować każdą subskrypcję, czyli już nie śledzić nowych wartości, które przychodzą. Każda tutaj subskrypcja to interface “IDisposable” , która ma metodę “Dispose” . Po jej wykonaniu subskrypcja jest anulowana.
Działanie kodu w konsoli wygląda tak ,gdzie '’a” to znak by aplikacja przeszła dalej (patrz Console.ReadKey).
Co mogą biblioteki RX:
- Asynchroniczne programowanie. Rx ma wbudowane wsparcie dla kolekcji obserwatorów w wątkach w tle.
- Obsłużenie zdarzeń w przedziwny sposób. Jak np. dzięki metodzie LINQ “Skip(2)” wydarzenie będzie zachodzić dopiero za trzecim razem.
- Możesz tworzyć obiekty obserwujące z egzystujących wydarzeń i kolekcji i używać jej potem jak zwykłą klasę. Jest nawet metoda “ToEvent()” .
Istnieją aż cztery wersje tej biblioteki w zależności od technologii, do której chcemy ją użyć.
Informacje, które były pomocne dla tego wpisu:
- Mike Taulty’s Blog http://mtaulty.com/CommunityServer/blogs/mike_taultys_blog/archive/2011/05/03/reactive-extensions-for-the-rest-of-us-devdays-holland-part-2.aspx
- MSDN : IObservable<T> oraz IObserver<T>,
- Use Silverlight Reactive Extensions (Rx) to build responsive UIs
- Dev Days