What RxJS really is ? Part 1

What RxJS really is ? Part 1

This blog will be the first part of “What RxJS really is?” series and will cover,

  • RxJS explained in simple terms.

  • Why use RxJS ?

  • RxJS Terms and Syntax

RxJS explained in simple terms.

“RxJS is a library for composing asynchronous and event-based programmes by using observable sequences.”

This is the explanation given by the RxJS team in the official documentation, and believe me, I did not understand a thing from this line. So let’s make things simple with an example:Let’s take the very simple example shown in the RxJS documentation. listening to click events and only logging one click per second.

The vanilla JS approach

The RxJS approach

let count = 0;
let rate = 1000;
let lastClick = Date.now() - rate;
document.addEventListener('click', () => {
  if (Date.now() - lastClick >= rate) {
    console.log(`Clicked ${++count} times`);
    lastClick = Date.now();
  }
});
import { fromEvent, throttleTime, scan } from 'rxjs';

fromEvent(document, 'click')
  .pipe(
    throttleTime(1000),
    scan((count) => count + 1, 0)
  )
  .subscribe((count) => console.log(`Clicked ${count} times`));

As you can see, the number of lines in the logic is insane; you don’t even have to worry about checking if the last click was one second before. That is handled by the throttleTime operator.

At this point, you don’t have to know what all the operators (throttleTime and scan methods are called operators) and subscription (which will be discussed further down) are doing in the code, but it is very important to understand what and how RxJS works here. RxJS and observables act as a funnel. The event is filtered through various types, and the end result is returned. This is extremely useful for developing complex solutions to complex problems because it allows us to massively transform the data we want to use with methods.

pssst: “Always think of RxJS and observables as a funnel where data goes through various operators that we can define to help us transform data.”

Why Use RxJS ?

There are other ways to handle async operations, such as callbacks and promises.

  • Callbacks: A callback is a function that can be called back after an async function is complete. But callbacks can be difficult to manage when working with nested async operations.

  • Promises: A promise is an object that may produce a single value some time in the future. It can only handle a single emission and is not cancelable.

  • async/await: async/await is a special syntax that allows writing asynchronous code that looks synchronous. It can also only handle a single emission and is not cancelable.

But RxJS

  • One technique to rule them all — it gives the ability to work with the same technique but with different data sources. (Ex: either an API call or a mouse event can both use the same technique to flow data.)

  • Compositional: Our views often require data combined from several sources. We easily compose data with RxJS.

  • Watchful: RxJS can produce multiple values over time and uses a push model to notify our code when specific actions occur, making it easy for us to watch and react to user interactions or data changes.

  • Lazy: Evaluation does not start until subscription, so we can create recipes that only execute when we need the result.

  • Errors are handled gracefully: Built in Errror handling.

  • Cancellable: We can cancel asynchronous actions. For example, if a user clicks on product A and then quickly clicks on product B, we can cancel the request for product A and only return product B.

pssst: Be patient and read more to understand how each aspect discussed above can be used in code 😄. The key takeaway here is to understand why to use RxJS and not other async operations.

Before moving into RxJS terms and syntax, let’s look at two pseudo-code-level examples to understand more about how RxJS can be used to make our lives easier.

Example 1: Simple math addition.

// code A
x = 5
y = 3
z = x + y
x = z

// z is 8

Code A is a very basic example; we find the total of “z” and then, after the value has been assigned, we change the value of “x.” Still, “z” will be 8, right? Of course, the value assigned was 8 when the expression was first evaluated. “z” is not going to change for changes made after the expression. But what if we need to do that? What if there is a situation that requires “z” to change whenever “x” or even “y” changes?

//code B
// $ var is observed for changes
x$ = new Observable()
y$ = new Observable()
x$ = 5
y$ = 3
z$ = combine(x$, y$).pipe(map([x,y] => x + y ));
x$ = 7

// z$ emits 8, then 10

RxJS and observables enable us to listen to variable changes. When X or Y of them changes, the z$ listens for it and performs the x + y operation to calculate the total.

psst: You don’t have to understand what combine, pipe, or map are doing here, but the important takeaway is that RxJS allows us to react to changes and have those changes propagated.

RxJS Terms and Syntax

To establish a solid foundation in understanding RxJS, we must first thoroughly understand the terminology listed below.

  • Observer/Subscriber

  • Observable

  • Subscribing

  • Unsubscribing

  • Creation function

Let’s take the example of apples on a conveyer belt here.

Pssst: When looking at data streams, always think of apples coming out of a conveyor belt.

Observer/Subscriber

As an observer in our apple factory, you observe the apples as they are emitted by the conveyor. You will be notified as soon as the next apple arrives so that you can process it. You are notified of an error condition so you can handle it, and you are notified when there are no more apples, so you are done for the day. As its name implies, an observer is an object that observes and responds to notifications specified as methods: next to handle the next emitted item, error to handle an error condition, and complete to do any final processing or cleanup.

Definition of an Observable from the RxJS documentation:

“An Observer is a consumer of values delivered by an Observable. Observers are simply a set of callbacks, one for each type of notification delivered by the Observable: next, error , and complete".

A subscriber implements the observer interface. While observer is the class we used to observe emitted items, inside RxJS, each observer is converted to a subscriber. A subscriber is basically an observer with additional features to unsubscribe from an observable.

Let’s look at how an observer looks like in code:

const observer = {
    next: apple => console.log(`Apple emmited ${apple}`),
    error: err => console.log(`Error occrurred: ${err}`),
    complete: () => console.log(`No more apples, go home`)
};

psst: The key takeaway here is to think that an observer is an object with three methods!

Observable

An observable is a collection of events or values emitted over time. Events can come from user actions such as key presses, selections, and mouse moves, or from the application itself, such as the router or forms. Data can come from a back-end server via HTTP or internal structures.

An observable can emit any type of data, including:

  • Primitives: numbers, strings, dates

  • Events: mouse, key, valueChanges, routing

  • Objects

  • Arrays

  • HTTP response

  • Another observable

Observables can be:

  • Synchronous: The items are emitted immediately and in sequence, for example, by emitting items from an existing array.

  • Asynchronous: The items are emitted over time, for example, if a user changes their mind about the quantity of items to buy.

  • Finite: emit as much as the numbers in an array.

  • Infinite: emit a count when a timer goes off every second forever.

How an observable looks like in code ?

const apples$ = new Observable(appleSubscriber => {
    appleSubscriber.next('Apple 1');
    appleSubscriber.next('Apple 2');
    appleSubscriber.complete();
})

The “$” sign is used to visually indicate that this variable can be observed as it emits items. In the constructor, we optionally provide a function that’s called when the observable is subscribed. This function is given to the subscriber. Within the function, we can use that subscriber to call next to emit values, error to raise an error, or complete to provide notification that there are no more items to emit. Here, we call next to emit two apple strings and then complete.

psst: This is not the exact way we will be creating observables in code, but it’s best to understand the long-form syntax to build the foundation.

so right now we have defined an observer and observable.

const observer = {
    next: apple => console.log(`Apple emmited ${apple}`),
    error: err => console.log(`Error occrurred: ${err}`),
    complete: () => console.log(`No more apples, go home`)
};

const apples$ = new Observable(appleSubscriber => {
 appleSubscriber.next('Apple 1');
 appleSubscriber.next('Apple 2');
 appleSubscriber.complete();
});

Will this code emit two apples strings? Nope. Let’s move to the next topic to know more!

Subscribing

In the apple factory, you start the conveyor to begin receiving emitted apples. With RxJS, we call the subscribe method on the observable to start receiving notifications and pass in the observer so the observable knows where to send those notifications to. We must subscribe to an observable. Otherwise, we receive no emissions and have nothing to observe.

//observer
const observer = {
    next: apple => console.log(`Apple emmited ${apple}`),
    error: err => console.log(`Error occrurred: ${err}`),
    complete: () => console.log(`No more apples, go home`)
};

//observable
const apples$ = new Observable(appleSubscriber => {
 appleSubscriber.next('Apple 1');
 appleSubscriber.next('Apple 2');
 appleSubscriber.complete();
});
//subscrption
const sub = apples$.subscribe(observer);

We call the subscribe method on the observable like this: When we subscribe, we pass in an observer to monitor that subscription and react each time an item is emitted, when an error occurs, or when the observable completes. To put this another way, as part of the subscription process, we tell the observable how we will observe it. The subscribe method returns the subscription, which represents the execution of the observable. Now that we have subscribed, this code in the constructor is executed. It first calls next, the observer gets the next notification, and it displays the next method message in the console. It calls next again, displaying a second message in the console. Lastly, this code calls the complete method. The observer gets the complete notification and displays the complete message.

Calling this complete here cancels all the subscriptions to that observable. But what if we want to cancel a subscription directly?

Unsubscribing

In our apple factory, we turn off the conveyor before we leave to ensure we don’t end up with apples leaking everywhere. With RxJS, to stop receiving notifications, we call unsubscribe on the subscription returned from the subscribe method. Every time we subscribe to start receiving notifications, we should also unsubscribe to stop receiving those notifications. This avoids potential memory leaks in our applications. There are actually several ways we can stop receiving notifications from an observable. Calling the subscriber’s complete method automatically cancels all subscriptions. Some creation functions and operators automatically complete after emitting all of their items. Completion cancels all subscriptions. We’ll talk more about this shortly. Any uncaught error executes the observer’s error method and cancels all subscriptions. Once an unhandled error occurs, the observable won’t emit any more items. We can also unsubscribe from an observable to stop receiving notifications. Unsubscribing does not call the observer’s completemethod. It simply lets the observable know that we are no longer interested in observing it.

pssst: Recap on what we learned now:

//observable - hard coded data source 
const apples$ = new Observable(appleSubscriber => {
    appleSubscriber.next('Apple 1');
    appleSubscriber.next('Apple 2');
    appleSubscriber.complete();
});

// observer to tell the observable what to do with the emited values.
const observer = {
 next: apple => console.log(`Apple emmited ${apple}`),
 error: err => console.log(`Error occrurred: ${err}`),
 complete: () => console.log(`No more apples, go home`)
};
//subscrption - telling that we are reeady to receieve notifications
const sub = apples$.subscribe(observer);
// unsubscribe - good practice to call unsubscribe
sub.unsubscribe();

But is this really how we create our observables with a constructor? Nope. Let’s look at creation functions next.

Creation Functions

When using RxJS in an Angular application, we often don’t create our own observables. Rather, we work with the observables Angular creates for us. But there are times when we want to create one ourselves. We could create an observable using the observable constructor, but the recommended technique is to use the built-in creation function. Of is one such function. Of creates an observable using a set of defined values, emitting each value, and then completing. In this example, the observable emits two strings and completes. Or we can use the from creation function. from creates an observable from an array or other data structure, emitting each individual value from that structure, and then completing.

//observable - hard coded data source 
const apples$ = new Observable(appleSubscriber => {
    appleSubscriber.next('Apple 1');
    appleSubscriber.next('Apple 2');
    appleSubscriber.complete();
});

const apples = ['Apple1', 'Apple2']
// creates an observable, emits and then completes
const apples$ = of(apples);
// output - ['Apple1', 'Apple2']
// creates an observable, emits and then completes
const apples$ = from(apples);
// output - Apple1, Apple2

Let’s try some actual code and check the console output.

Click here if the code preview fails to load.

Summary

  • “Always think of RxJS and observables as a funnel where data goes through various operators that we can define to help us transform data.”

  • RxJS enables us to react to changes, and the changes are propagated.

  • When looking at data streams, always think of apples coming off of a conveyor belt.

  • Observable: A collection of events or values emitted over time.

  • Observer: observes notifications from the observable. Methods for processing (next(), error(), complete())

  • Subscriber: An observer that can unsubscribe.

  • Subscription — tells the observable that the subscriber is ready for notifications.

  • Subscription — subscription() returns a subscription.

  • Subscription — Use Subscription to unsubscribe.

  • creation functions — of, from and create an Observable using the newkeyword).

  • Always call unsubscribe on the subscription.

Part 2 of the blog will explore more on the RxJS operators and other interesting topics under RxJS. Hola/follow me on Twitter or LinkedIn for any feedback, queries, and updates on the next article!! 😁