RxJS

Eine kleine Übersicht

Was ist RxJS?

  • “RxJS is a library for composing asynchronous and event-based programs by using observable sequences.”
  • “ReactiveX combines the Observer pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.”

Observables

  • Repräsentation einer beliebigen Menge von Werten
  • Werte können über eine beliebige Zeitdauer verteilt sein
  • Werte = Ereignisse (Maus-Klick) oder auch “1”, “A”

Subscriptions

  • Werte beobachten

Beispiel ohne RxJS:

var button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

Beispiel mit RxJS:

var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
  .subscribe(() => console.log('Clicked!'));

Subject

  • implementiert das Interface Observable und erweitert dieses um zusätzliche Methoden (z. B. next) zur Auslösung des Observables
  • Methode subscribe liefert Subscription
    • Observable-Zustand von coldhot: Werte werden überwacht und verarbeitet

Beispiel

const subject = new Subject<string>(); 
subject.next('1'); 
subject.subscribe((value) => console.log(value)); 
subject.next('2'); 
subject.next('3');
// Ausgabe: 2 3

Errors

const subject = new Subject<string>();
subject.subscribe()
    (value) => console.log(value),
    (error) => console.error(error),
    () => console.log('finished'));
subject.next('1');
subject.next('2');
subject.error(new Error('something went wrong'));
subject.complete();
// Ausgabe: 1 2 FEHLER

finished wird nicht ausgegeben, da Observable nach Fehler nicht mehr beendet werden kann

Operatoren

machen RxJS erst spannend…

Hier ein paar Beispiele:

Erstellung

from:

Observable.from([1, 2, 3]) 
   .subscribe((value) => console.log(value));

timer / interval:

Observable.timer(500, 100) // nach 500ms alle 100ms
   .subscribe((value) => console.log(value));

Transformation

map:

Observable.from([1, 2, 3])
   .map((x) => x * 10) 
   .subscribe((value) => console.log(value));

flatMap:

Observable.from([1, 2, 3])
   .flatMap((value) => Observable.from('A', 'B'))
   .subscribe((value) => console.log(value));
   // Ausgabe: A B A B A B

Filter

filter:

Observable.timer(500, 100)
   .filter((value) => value % 2 === 0)
   .take(5) 
   .subscribe((value) => console.log(value));
   // Ausgabe: 0 2 4 6 8

take:

Observable.interval(100)
   .take(5) 
   .subscribe((value) => console.log(value));
   // Ausgabe: 0 1 2 3 4

takeUntil:

const delayed = Observable.timer(1000);
Observable.interval(500)
   .takeUntil(delayed) 
   .subscribe((value) => console.log(value));
   // Ausgabe: 0 1

Kombination

zip:

const nameObservable = Observable.from(['Jim', 'Johnny', 'Jack']);
const surnameObservable = Observable.from(['Beam', 'Walker', 'Daniels', 'Jameson']);
 
Observable.zip(nameObservable, surnameObservable)
   .map((values) => `${values[0]} ${values[1]}`)
   .subscribe((value) => console.log(value));
   // Ausgabe: Jim Beam Johnny Walker Jack Daniels“

merge:

Observable.interval(150)
   .merge(Observable.interval(200)) 
   .take(6) 
   .subscribe((value) => console.log(value));
   // Ausgabe: 0 0 1 1 2 2

Utility

do:

Observable.from(['Jim', 'Johnny', 'Jack'])
   .do((name) => console.log(name)) 
   .subscribe();

delay:

Observable.interval(100)
   .delay(300) 
   .take(5) 
   .subscribe((value) => console.log(value));

Fehlerbehandlung

catch:

Observable.interval(100)
   .do(() => {throw Error('something went wrong')})
   .catch((error) => 'fixed it!')
   .subscribe((value) => console.log(value));
   // Ausgabe: fixed it!

retry:

Observable.from(['Jim', 'Johnny', 'Jack'])
   .do((name) => console.log(name)) 
   .do(() => {throw Error('something went wrong')})
   .retry(2) 
   .subscribe((value) => console.log(value));
   // Ausgabe: Jim Jim Jim FEHLER

Quellen