How to Untether your WebApps with Observables and RxJS

If you’re a developer and you like to stay on top of things, chances are you’ve heard the term “Observables” by now. If you haven’t, you’re welcome, this is for you. And if you’re already familiar, well, this article might give them a new spin.
First off, I want to make clear that this take on Observables is one based on my own struggles and experiences. Rather than giving you the academic blabber about what the RxJS docs and API specs state word-by-word, I’d rather tell them as I would have liked someone to tell me.
This is my added value, but it comes with fair warning: not all terms used and approaches described in this post are one-to-one with the official docs. Where there might be divergences, I’ll try to point them out as to avoid confusion on behalf of the reader.

What are Observables?

Imagine you have a set of events within a given scope, like seconds ticking during your reading of this post. This stream of ticking events will be happening a finite set of times, typically around the hundreds, if you’re an average reader, one event for each second. They could also be open-ended in time if you were an infinitely slow reader, and were caught in this article forever in Sartrian fashion. You might also be infinitely fast and let the clock tick just once, or you might actually never read it at all and have no events occur. But the real question is, who’s listening to the ticks? It could be no-one, it could be just one listener, or it could be multiple. Furthermore, when each listener subscribes to the stream, will determine what events they receive.
ReactiveX, and its JS implementation RxJS, gives a holistic toolset to address these conceptual scenarios, with Observables. Here the scope of the events happen within the instance of an Observable. In other words, one Observable is the playing-field in which a sequence of events occur, so we could also think of it as a “stream”, “channel” or “pipe”. In addition, an Observable instance exposes a subscribe method which can be used to be notified of each occurring event.

Creating Observables

Manually

The simplest example of the (manual) implementation of an Observable could be:

const stuff$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

stuff$.subscribe(num => {
  console.log(num);
});

Example 1a
Note the Subscriber parameter here. It’s one of the key components of an Observable (along with Subscription). Also, pay attention at the complete() method, which closes the Observable and emits a completion event. Finally, notice how I appended a dollar sign to the name of the variable holding the observable: this is a common convention.
You can also create an Observable interface-implementing Subject:

const stuff$ = new Subject();
stuff$.subscribe(num => {
  console.log(num);
});
stuff$.next(1);
stuff$.next(2);
stuff$.next(3);
stuff$.complete();

Example 1b

/* Output of either: 1a and 1b*/
// 1
// 2
// 3 (complete)

With helper functions

It is not usual to instantiate an Observable manually. Instead, we usually rely on helper methods (technically creation operators – we’ll talk about more of those in the following section) like: of, from, defer, etc, which return an Observable instance. Let’s take a look at how they work and what type of observables they help you to create.

  • of: Will emit any arguments passed, in sequence, on subscribe (which makes it “cold”), and will then complete.
    Example:

    of(1, 2, 3); // same as example 1
    

    Example 2

 

  • from: Same as above, but accepts one argument only, which it parses. Most typically used for conversion from promises. In this case, the observable emits even if the promise has already been resolved.
    Example:

    from([1, 2, 3]); // same as example 1
    

    Example 3a

    from(new Promise(resolve => resolve(1));
    // outputs 1 and completes
    

    Example 3b

 

  • defer: Takes a function which returns an Observable, and is called on every subscription. This is a typical example of a “cold” observable.
    Example:

    defer(() => of(1, 2, 3)); // same as example 1
    

    Example 4a

    defer(() => from(asyncApiCall()));
    // Will make call *on (each) subscription*
    

    Example 4b

 

  • fromEvent: This attaches the created Observable to the emissions of a DOM or NodeJS event. First parameter is the emitting target, second is the name of the event to forward. This is a typical case of a “hot” observable.
    Example:

    fromEvent(document, 'click'); // emits on every click
    

    Example 5

 

  • interval: This creates an observable which will fire indefinitely on a cadence defined by the parameter passed in (in milliseconds). Output is equivalent to example 7.
    Examples:

    interval(1000); // emits an incrementing, zero-offset counter
    

    Example 6

 

Subscribing to Observables

Every time you subscribe to an observable, a Subscription is created and stays open (and in memory) until unsubscribed or until the observable closes (which automatically unsubscribes). One cannot stress enough the importance of unsubscribing to uncompleted observables, when the subscription is no longer necessary (e.g. from a component that is unmounted/destroyed).
Let’s take a look at this more elaborate example:

const timer$ = new Observable(subscriber => {
  let i = 0;
  const timer = () => subscriber.next(i++);
  setInterval(timer, 1000);
  return () => {
    clearInterval(timer)
    subscriber.complete();
  };
});

const timerSubscription = timer$.subscribe(num => {
  console.log(num);
});

/* Output */
// (wait 1 second...)
// 0
// 1
// 2
// … until
timerSubscription.unsubscribe();

Example 7

Handling error and completion events

Until now we’ve only mentioned regular events, but Observables may also emit two other types of events: on error, if an error occurs in the source of the observable, and on completion, when the observable closes. To subscribe to these two you can either pass an PartialObserver type, or a second (error handler), and third (completion handler) parameter to the subscribe method. Let’s see both examples:

from(apiCall()).subscribe(
  response => console.log(response),
  error => throw error,
  () => console.log('Observable closed')
);
// same as...
from(apiCall()).subscribe({
  next: response => console.log(response),
  error: error => throw error,
  complete: () => console.log('Observable closed')
});

Example 8

Types of Observables

So we’ve established that each Observable is an encapsulation of a sequence of events. But here’s the twist: Observables may or may not come with an ignition-key. This means, you may hook up the Observable to an existing source of events (like an open websocket connection, a DOM event stream, user events, etc.), and the Observable would just be the “event-pipe” to which multiple subscribers can listen to at any point, or, and this is the interesting part, you can supercharge this “pipe” to also ignite the source of events on subscription. We call these two conceptual types “hot”, and “cold”, respectively:

  • “Hot” Observables: they “proxy” events from an independent source, that may (or may not) have started (and even stopped) firing by the time of any given subscription. Example: a stream of MouseEvents, which will exist and may be firing with whether or not we’ve “attached” it to an Observable, or before we do (see example 5). Note: these Observables are typically multicast to multiple observers.
  • “Cold” Observables: They are the complete package. They turn the ignition of the event-source on subscription. Example: We might want to perform a network request each time we subscribe to an observable (see example 4). Note: these Observables are typically unicast to one single observer.

Sometimes, you will hear of a third type of Observables, called “warm” Observables. By the name, you might have guessed that they represent a mix of both of the above types. In reality, they are cold Observables to the first subscriber (so the event-source is started on the initial subscription) and hot Observables to subsequent subscribers.
A typical use is with resetting the Observable when the count of subscribers (known as refCount) drops to zero, and restart a new instance of the event-emitting source on a newly first subscription.

The ecosystem

Operators

We’ve already kind of seen operators when creating Observables, however the real operators are the ones that react and/or transform Observable throughput. You use these within the pipe method of Observable, and you can concatenate as many of those as you want. They will be called in order and forwarding the transformed value, very much the same as then in promises. Let’s see some typical examples:

Simple operators
  • map: very much like Array.map, allows you to handle each event and transform it to a given return value.
    Example:

    of(1, 2, 3)
      .pipe(map(e => e + 1))
      .subscribe(e => console.log(e))
    /* Output: */
    // 2
    // 3
    // 4 (complete)
    

    Example 9

 

  • tap: same as map, but return is ignored and throughput is unchanged. Use this method for per-event side-effects or for logging/debugging purposes.
    reduce: similar to Array.reduce, accumulates according to handler function and returns accumulator on complete, only. Similar to operator scan, which returns accumulator on every event.
    Example:

    of(1, 2, 3)
      .pipe(reduce((acc, e) => `${acc}${e}`, ''))
      .subscribe(e => console.log(e));
    /* Output: */
    // "123" (complete)
    

    Example 10

 

  • take: only takes as many events as specified and completes thereafter (or when observable closes).
    Example:

    of(1, 2, 3)
      .pipe(take(2))
      .subscribe(e => console.log(e));
    /* Output: */
    // 1
    // 2 (complete)
    

    Example 11

 

  • first: like take(1), but will emit an error if the observable closes before emitting.
    Example:

    EMPTY // rxjs.EMPTY creates an observable that immediately closes
      .pipe(first())
      .subscribe(
        e => console.log(e),
        error => console.log(`Error: ${error.message}`),
        () => console.log('Complete!'),
      );
    /* Output: */
    // Error: no elements in sequence
    

    Example 12

Advanced operators

  • switchMap, mergeMap, concatMap, exhaustMap: maps emission of source (a.k.a. “outer”) observable onto emission of nested (a.k.a. “inner”) observable.
    Example:

    interval(1000)
      .pipe(switchMap(e => apiCall(e)))
      .subscribe(e => console.log(e));
    /* Output: */
    // Result of apiCall(1)
    // Result of apiCall(2)
    // ...
    

    Example 13

    • Note: the difference between these three types of “combination” operators is about how to resolve overlaps or completions by the Observables. I full-heartedly recommend the RxMarbles examples to visually compare the three.

 

  • forkJoin: Very similar to Promise.all in concept. It takes Observables as parameters, an array, or an object-map of Observables and waits for all of them to complete, and returns an array (first two cases) or an object-map (third case) of the results.
    Example:

    forkJoin({
      foo: apiCall('foo'),
      bar: apiCall('bar'),
      baz: apiCall('baz')
    }).subscribe(
      apiResponses => {
        // do something with apiResponse.foo, apiResponse.bar or apiResponse.baz
      },
      error => console.log('One of the requests failed:', error)
    );
    

    Example 14

    • Note: this operator is actually a creation operator.
    • Note 2: if you want to handle errors individually, as opposed to the combined operation failing if a single request fails, you need to use the catch operator on each

 

  • combineLatest: Similar to forkJoin, but emits every time the input Observables do (once all input Observables have produced at least one value). This means it could have infinite values and may not complete, and that the input Observables don’t have to complete before producing a value.
    Example:

    combineLatest(
      apiCall('foo'),
      apiCall('bar'),
      apiCall('baz')
    ).subscribe(([foo, bar, baz]) => console.log(
      `API-Call Foo Latest: ${baz},
      API-Call Bar Latest: ${bar},
      API-Call Baz Latest: ${baz}`
    ));
    

    Example 15

    • Note: this operator is actually a creation operator.

 

  • debounce, audit, sample, throttle, buffer, window: We’ll talk about this in common use-cases, but these operators (and their “~Time” counterparts) are ways to limit or orchestrate the timing of event emissions. You might be familiar with some of the concepts from libraries like lodash.

 

Common use-cases

Working with async calls (a.k.a. Promises)

We’ve already learned how to create an observable from a promise in example 3b, but the opposite is quite simple too. See the following toPromise example:

const resultPromise = timer(3000).toPromise();
resultPromise.then(result => console.log(result));

Example 16a
Or, if you’re in an async context:

const result = async timer(3000).toPromise();
console.log(result);

Example 16b
[js]/* Output of both */
// (wait 3 seconds)
// 0 (complete)[/js]

Working with DOM events

Very often, you’ll find yourself having a stream of mouse events that need to be filtered, debounced or throttled. A typical scenario is manually detecting double-clicks. This example is kindly borrowed from learnrxjs:

const clicks$ = fromEvent(document, 'click')
clicks$.pipe(
  buffer(clicks$.pipe(throttleTime(250))),
  // if array is greater than 1, double click occured
  filter(clickArray => clickArray.length > 1)
).subscribe(() => console.log('Double Click!'));

Example 17

Scenarios of untethering

WebAPI calls and warm Observables

I’ve briefly mentioned “warm” Observables as a third type of Observable. Here’s where they can come in handy. Imagine the typical scenario where you have multiple components requesting WebAPI data. If they occur relatively close to one another in time, you’ll probably want to avoid having each of these resulting in a separate call, so as to minimize network traffic to the WebAPI server.
This is relatively simple to achieve with Observables:

function performNetworkRequest(): Observable {
  return defer(() => from(asyncApiCall()));
}

let bufferedRequest$;
function getBufferedNetworkRequest() {
  bufferedRequest$ = bufferedRequest$ || performNetworkRequest()
  .pipe(
    shareReplay(),
    finalize(() => { bufferedRequest$ = undefined; })
  );
  return bufferedRequest;
}

Example 18
In example 18, the first subscription will trigger the networkRequest. All subscriptions occurring while the network-request is “in-flight”, will wait for the network request to resolve (achieved with shareReplay), and then notify all subscribers and close. With the finalize operator, we can clear the bufferedRequest$ variable when the source completes, so that new calls will re-trigger the network request.
We could easily improve upon this by setting a time-based caching strategy, but this falls out of the scope of this article.

Using libraries that implement RxJS

As time advances more and more libraries start implementing reactive patterns. Some of them have RxJS built-in, such as Angular, which allows for their internals, as well as their external usage to allow for greater optimization.
An example with Angular’s API is its httpClient service, which will handily return Observables when performing network requests, as well as the form control, which allows easy subscriptions to changes in the form values (valueChanges) and reactive validation.

Keep digging tapping!

Every colleague to whom I’ve talked to who has worked with advanced RxJS patterns, has stated every time how this library and its patterns blew their minds. This very much happened to myself as well. I’ve gone through several moments of “wow, now I get it” to “wow, now I really get it”, and I’m sure there will be more down the road.
I must also say that, despite the general boost in productivity I’ve experienced thanks to RxJS, I’ve sometimes also lost hours at a time trying to make sense of the official documentation (be sure to check learnrxjs.io, it’s a life-saver), or just by debugging issues where events were not “coming through” or “getting lost”, and that ultimately stemmed from some of my own stupid mistakes (like, for instance, not closing a source on the forkJoin). Remember to use the tap operator for debugging in between operators!
There are many other aspects of Observables I haven’t gotten into for time-reasons, such as types of Subjects (e.g. BehaviorSubject, ReplaySubject) or testing (with marbles). I encourage you to take the journey yourself, and feel free to drop us a line at sayhello@zartis.com if you’d like us to write about any related topics in the future.

Philip Thomas Casado
UI Lead @ Zartis
__________________________
Philip is one of our leading UI Engineers in the Zartis team. If you need additional information on the above topic or any other front-end related topic, feel free to drop us a message and we will connect you to him!

Share this post

Zartis Tech Review

Your monthly source for AI and software related news