RxJS, real-world examples (Part 1)

Rafael Siqueira
4 min readJan 16, 2021

Reactive programming is increasingly gaining ground in the world of imperative languages. Rxjs npm packages just reached more than 12 million weekly downloads and observable streams are a vital peer dependencies of many important open source projects, Angular perhaps being the most prominent of them.

While reactive programming is more straightforward for functional programmers, it imposes a steep learning curve for imperative programmers. It also results in less readable code, thus harder to learn from examples.

The few usage examples we do find seem to cover only abstract scenario such as mouse clicks or marble diagrams with parallel observables that complete after a few seconds. While the Rxjs docs and the Rxjs marbles (https://rxmarbles.com) can be useful to understand the differences between mergeMap, switchMap and concatMap, they fall short when an imperative programmer is still learning how to call an asynchronous method from the observable stream.

This series will introduce a few real world which made me finally begin to grasp RxJS. I hope they will also help you.

Important notation observation: we will be using the $ sign in method names to denote methods that already return an observable stream.

At the real world RxJS streams usually come from arrays

Contrary to RxJS examples, in the real world you will usually start processing your stream from an array or from a promise that resolves to an array. A few real cases are database queries, lists of files or API calls.

I initially struggled with understanding that these arrays should be first transformed into streams of “items”, before I could fully move them up the stream.

How to create observables from an array:

Keep an eye on the mergeAll() operator. It will quickly become your friend. While the of() operator converts an array into an observable array, only after piping it through mergeAll() you will have a stream of items. But why do I want a stream of items you ask?

Let’s imagine you have an SQL Query to retrieve all the users in your system. You then want to send emails to all your active users.

Calling asynchronous methods with RxJS

A few important concepts can be seen in this small piece of code:

  1. How to create an observable stream from a method that returns a promise, a.k.a. an asynchronous method using the from() operator (line 1);
  2. The filter() operator, which is similar to using an if() on an imperative paradigm execution flow (line 3);
  3. concatMap() runs the async sendEmail() method in a synchronous way, waiting for each execution to complete, before calling the next one;

Finally, and most importantly here is the structure of the subscribe method. Similar to promises, the first (success) => arrow function is similar to then(), the second (err) => function is similar to catch() and the final () => is your finally().

But this is a code example that could definitely be improved by sending emails in parallel. That is where mergeMap() is so useful.

How to send emails in parallel with RxJS the wrong way

Warning: If you do try this method, you will quickly overload your SMTP server, which will start refusing your emails. But mergeMap() has a very useful parameter to control the maximum level of parallelism.

How to send emails in parallel with RxJS the right way

In both examples we are considering that the getUsers$() method already returns observable streams. If you plan on using RxJS observables, converting your asynchronous methods into methods that return observables will make your code more readable.

Converting asynchronous methods to observable returning methods

We are using the if and else just to illustrate that the code of the asynchronous method could be more complex. Of course in this case we could just have used the from() operator.

Asynchronous methods with multiple promises using RxJS

With more experience with RxJS you will get used to the ternary operator (?) used in getUsers2$() which keeps your pipe flow succinct and allows you to use one-liners and keep your pipe flow flowing.

I will stop this tutorial with these examples, but will continue with this series of stories by discussing the reduce() and scan() operators and how they can help you put a stop in your stream to wait for results to arrive, before you can move on.

Keep tuned for part 2 of this series.

For reference, since we are talking about real world examples, reference implementations of closer-to-reality methods for getUser$() and sendMail$(email):

Example mssql query to observable stream

Example of sending emails using nodemailer and observables

--

--

Rafael Siqueira

C, C#, Objective C, Java, JavaScript, Typescript, Python and Swift programmer. Azure, AWS and GCE. CloudFormation, Terraform and K8s.