Operators in RxJS - Simple & Code Examples

Operators in RxJS - Simple & Code Examples

One of the key features of RxJS is its ability to transform, filter, and combine data streams using operators. Operators are functions that take an Observable as input, modify it in some way, and return a new Observable as output. They allow you to create complex data processing pipelines that can handle a wide range of scenarios, from simple transformations to advanced filtering and merging. With a large number of built-in operators and the ability to create custom ones, RxJS provides a powerful toolkit for reactive programming that can help you build robust, scalable, and responsive applications.

Creation Operators

Creation operators are used to create Observables from various data sources, such as arrays, events, timers, and promises. They are a convenient way to start a data stream and emit values over time. Some examples of creation operators include of, from, interval, fromEvent, and defer.

Example in JypeScript

import { from } from 'rxjs';

const data = [1, 2, 3, 4, 5];

from(data).subscribe(value => {
  console.log(value);
});

In this example, we use the from operator to create an Observable that emits the values of an array data. We subscribe to the resulting Observable and log each value to the console.

The from operator is useful for creating Observables from various data sources, such as arrays, promises, and iterables. It's often used to start a data stream and emit values over time.

Custom Creation Operators

Custom creation operators can be useful when working with data sources that do not have built-in creation operators, or when creating observables with specific behaviors or requirements. They offer developers a high degree of flexibility and control over their observables.

Example in JypeScript

import { Observable } from 'rxjs';

function customCreationOperator() {
  return new Observable(subscriber => {
    // emit values over time
    subscriber.next('value 1');
    setTimeout(() => {
      subscriber.next('value 2');
    }, 2000);
    setTimeout(() => {
      subscriber.next('value 3');
      subscriber.complete();
    }, 4000);
  });
}

// subscribe to the custom observable
customCreationOperator().subscribe(value => console.log(value));

In this example, we define a custom creation operator called customCreationOperator() that creates a new observable that emits values over time using the Observable constructor. We use the next() method to emit three values over time with a 2-second delay between the second and third value. We then use the complete() method to signal the end of the observable. Finally, we subscribe to the custom observable and log the emitted values to the console.

Transformation Operators

Transformation operators are used to transform the emissions of an Observable into a new sequence of emissions. They are typically used to modify or filter the data stream to better fit the requirements of the application. Some examples of transformation operators include map, pluck, scan, switchMap, and mergeMap.

Example in JypeScript

import { fromEvent } from 'rxjs';
import { switchMap } from 'rxjs/operators';

const button = document.querySelector('button');

fromEvent(button, 'click').pipe(
  switchMap(() => from(fetch('https://jsonplaceholder.typicode.com/todos/1')))
).subscribe(response => {
  console.log(response.json());
});

In this example, we use the fromEvent creation operator to create an Observable that emits a value every time a button is clicked. We then use the switchMap operator to transform the click event into a new Observable that fetches some data from an API.

The switchMap operator cancels the previous inner Observable if a new event occurs, ensuring that we only receive the latest response from the API. We subscribe to the resulting Observable and log the JSON response to the console.

The switchMap operator is useful for flattening nested Observables and mapping the values emitted by an Observable into a new Observable. It's often used when making HTTP requests or chaining multiple API calls together.

Filtering Operators

Filtering operators are used to selectively emit or ignore emissions based on certain criteria. They can be used to reduce the amount of data flowing through the data stream and improve performance. Some examples of filtering operators include filter, distinctUntilChanged, take, skip, and debounceTime.

Example in JypeScript

import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

// Create an Observable from an array of numbers
const source$ = from([1, 2, 3, 4, 5]);

// Use the filter operator to emit only even numbers
const even$ = source$.pipe(filter(value => value % 2 === 0));

// Subscribe to the filtered Observable and log each value
even$.subscribe(value => console.log(value)); // Output: 2, 4

In this example, we create an Observable using the from creation operator that emits an array of numbers. We then use the filter operator to only emit the even numbers emitted by the Observable. Finally, we subscribe to the Observable and log each even number to the console. The filter operator is one of the many filtering operators available in RxJS and is used to selectively emit or ignore emissions based on certain criteria.

Combination Operators

Combination operators are used to combine multiple Observables into a single Observable. They can be used to merge, join, or concatenate data streams from different sources. Some examples of combination operators include merge, concat, combineLatest, forkJoin, and zip.

Example in JypeScript

import { forkJoin, of } from 'rxjs';

const task1$ = of('Result from Task 1').delay(3000);
const task2$ = of('Result from Task 2').delay(2000);
const task3$ = of('Result from Task 3').delay(1000);

forkJoin([task1$, task2$, task3$]).subscribe(results => {
  console.log(results);
});

In this example, we create three Observables using the of() creation operator and simulate a delay in their emissions using the delay() operator. We then pass an array of these Observables to the forkJoin() operator, which waits for all of them to emit a value and then combines their emissions into a single array.

When we subscribe to the resulting Observable, we log the array of results to the console. Since we delayed the emissions of the Observables, the forkJoin() operator waits for the longest delay (in this case, 3 seconds) before emitting the combined results.

The forkJoin() operator is a useful tool for combining the emissions of multiple Observables into a single value. It's often used when making multiple API calls and waiting for all of them to complete before performing an action.

Multicasting Operators

Multicasting operators are used to share a single Observable execution among multiple subscribers. They can be used to reduce the amount of duplicate processing and network requests in an application. Some examples of multicasting operators include share, publish, refCount, shareReplay, and publishReplay.

Example in JypeScript

import { interval } from 'rxjs';
import { take, share } from 'rxjs/operators';

// Create an Observable that emits a value every second
const source$ = interval(1000).pipe(take(3));

// Use the share operator to multicast the Observable
const shared$ = source$.pipe(share());

// Subscribe to the shared Observable twice
shared$.subscribe(value => console.log(`Subscriber 1 received: ${value}`));
shared$.subscribe(value => console.log(`Subscriber 2 received: ${value}`));

// Output:
// Subscriber 1 received: 0
// Subscriber 2 received: 0
// Subscriber 1 received: 1
// Subscriber 2 received: 1
// Subscriber 1 received: 2
// Subscriber 2 received: 2

In this example, we create an Observable using the interval creation operator that emits a sequence of numbers at regular intervals. We then use the publish and refCount operators to multicast the emissions of the Observable to multiple subscribers. Finally, we subscribe to the Observable twice and log the emissions to the console. The publish and refCount operators are two of the many multicasting operators available in RxJS and are used to share a single Observable execution among multiple subscribers.

Error Handling Operators

Error handling operators are used to handle errors that occur during the execution of an Observable. They can be used to recover from errors, retry failed operations, or ignore errors and continue processing. Some examples of error handling operators include catchError, retry, onErrorResumeNext, finalize, and retryWhen.

Example in JypeScript

import { of } from 'rxjs';
import { catchError } from 'rxjs/operators';

const source = of(1, 2, 3, 4, 5, '6', 7, 8, 9);
const result = source.pipe(
  map((val: any) => parseInt(val)),
  catchError((err, caught) => {
    console.error('Error:', err);
    return of(0);
  })
);

result.subscribe(
  (val: number) => console.log('Parsed value:', val),
  (err: any) => console.error('Error:', err),
  () => console.log('Complete')
);

In this example, we create an Observable source that emits a mixture of numbers and a string that cannot be parsed to a number. We then use the map operator to convert each value to a number and handle any errors that occur during the conversion using the catchError operator. If an error occurs, we log the error message to the console and return an Observable that emits a single value of 0. Finally, we subscribe to the result Observable and log the parsed values to the console. If an error occurs, we log the error message to the console and continue processing the remaining values in the stream.

Pipable Operators

Pipable operators, also known as lettable operators, are a newer style of operator introduced in RxJS 5.5 that provide a more modular and composable approach to working with operators. Instead of chaining operators together using the dot notation, pipable operators are imported as standalone functions and then piped together using the pipe method of an Observable.

This allows for more flexible and efficient composition of operators, as well as improved tree shaking and code optimization. Some examples of pipable operators include map, filter, scan, mergeMap, and catchError.

Example in JypeScript

import { from } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const data = [
  { name: 'John', age: 30 },
  { name: 'Mary', age: 25 },
  { name: 'Bob', age: 40 },
];

from(data)
  .pipe(
    mergeMap(user => {
      // simulate an API call that returns user's full name
      return fetch(`https://example.com/users/${user.name}`).then(response => {
        return response.json();
      }).then(data => {
        return { ...user, fullName: data.fullName };
      });
    })
  )
  .subscribe(user => {
    console.log(user);
  });

In this example, we first create an Observable from an array of user objects using the from() creation operator. We then use the mergeMap() pipable operator to transform each user object into an Observable that performs an API call to retrieve the user's full name. We then merge the results of the API call back into the original user object using the spread operator.

When we subscribe to the resulting Observable, we log each user object to the console, which includes the original properties as well as the newly added fullName property.

The mergeMap() operator is a powerful tool for working with observables that emit other observables, such as when making API calls or performing asynchronous operations. It allows us to flatten nested observables and merge their emissions into a single stream of values.


In this post, I've provided a brief overview of different operators in RxJS and their use cases. However, this is by no means an exhaustive list, as RxJS provides a vast array of operators for working with Observables. I encourage you to refer to the official RxJS documentation to learn about all the available operators and their usage. It's always recommended to refer to official sources when working with complex libraries like RxJS, as relying on unofficial sources can lead to incomplete or inaccurate information. With its rich set of operators and powerful capabilities, RxJS is a valuable tool for reactive programming in JavaScript.