RxJs: Observables, Observers and Subscribers

RxJs: Observables, Observers and Subscribers

What is RxJS?

Well, let's start with the full meaning of RxJS. RxJS is Reactive Extensions for JavaScript. It is a library that facilitates reactive programming in JavaScript. So if it facilitates reactive programming in JavaScript, what is reactive programming?

Reactive Programming

Reactive programming or paradigm revolves around events. Modern applications are very interactive and event-based. Different responses are expected when users interact with these applications or when different events occur on the applications. To ensure that the interactions with the application will not affect the functionality of the application, most of the events and responses are done asynchronously.

In contrast to having a program that runs a particular code from top to bottom, reactive programming revolves around events, it allows applications to respond to events independently and asynchronously such that the functionality of the application is not overly affected during the process.

On the official documentation page of RxJS, it says "RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code."

Since we already talked about reactive programming, let's check out what "Observables" are.

What are Observables?

Observable is like a wrapper around a stream of data or multiple values over a period of time, what it does is to push the data and control how the data is pushed to the consumer which is usually an observer (a call back function), but we will come back to that later.

For example, in this block of code below, the observable is the one that determines what goes out and how it goes out. The observer sends out 1 first, then 2, then 3, after a second then 4.


import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

But that's not the end of it, is it? What happens when these values get pushed by an observable. Where does it push it to, what happens to the pushed values?

What are Observers?

An observer is like a callback function that gets called whenever new data is pushed by an observable. The observer executes a certain block of code when it receives data from the observable. Observers have three major methods, the next() method, the error() method, and the complete() method.


const observer= {
  next(x) { console.log('got value ' + x); },
  error(err) { console.error('something wrong occurred: ' + err); },
  complete() { console.log('done'); }
}

Like I mentioned earlier, an observer only gets called when a value is pushed or sent by an observable, when such values are pushed or sent, by default the next() method of an observer will be called with the value.

If the observable throws an error, the error() method of an observer is called with the error value.

If the observable is done, the complete() method of an observer is called. The complete() method is a void function. It tells the observer that no new value will be pushed by the observable in the future

To illustrate the methods, let's consider an angular application that is using HttpClient which makes use of observables, to make its HTTP calls. We can expect a response from the HTTP call, if a server-side error occurs or a timeout occurs, the observable will throw an error. In this case, the response from the observable will call the observer next() method and the error thrown by the observable will call the error() method of the observer.

All or any of the observer methods can be called.

Now that we know what an observable is and what an observer is, how do these two connect? How does an observer listen for values from an observable? That is where subscribers come in

What are Subscribers?

Subscribe is a method that is called to invoke observables. Subscribing to an observable is a way to tell the observable that something is listening for updates or pushes, and also a way for an observer to listen to the values transmitted by the observable. An observable does not push values until it is subscribed to.

Combining the three, we will have something like the code below. An observable is created, it is subscribed to and the observer is listening for the three methods which are next(), error(), and complete()


import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe({
  next(x) { console.log('got value ' + x); },
  error(err) { console.error('something wrong occurred: ' + err); },
  complete() { console.log('done'); }
});
console.log('just after subscribe');

The code above will return the snippet below

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done

A typical example of where observables, observers, and subscribers are often used in Angular is with the HttpClient from the HttpClientModule that is used in making HTTP calls to the server-side.