DevToolBoxGRATIS
Blog

Guía Completa de RxJS: Programación Reactiva para JavaScript

22 min readpor DevToolBox Team

TL;DR

RxJS is the go-to library for reactive programming in JavaScript. It models everything as Observable streams that you transform with composable operators like map, filter, switchMap, and debounceTime. Observables are lazy (unlike Promises), support multiple emissions, and can be cancelled. This guide covers creating Observables, all four Subject types, 15+ essential operators, error handling, multicasting, schedulers, marble-diagram testing, Angular and React integration, and real-world patterns like typeahead search, polling, and retry with exponential backoff.

Key Takeaways

  • Observables are lazy, cancellable streams that emit zero to many values over time, unlike Promises which are eager and single-value.
  • Operators like switchMap, mergeMap, and concatMap solve different concurrency scenarios; choosing the right one prevents bugs.
  • BehaviorSubject, ReplaySubject, and AsyncSubject each serve distinct multicasting patterns beyond the base Subject.
  • catchError and retry operators provide robust, composable error-handling pipelines.
  • Always unsubscribe from long-lived Observables to prevent memory leaks; use takeUntil, async pipe (Angular), or useEffect cleanup (React).
  • Marble-diagram testing with TestScheduler enables deterministic, synchronous testing of async streams.

1. What Is RxJS and Reactive Programming?

RxJS (Reactive Extensions for JavaScript) is a library for composing asynchronous and event-based programs using Observable sequences. Reactive programming is a paradigm that treats data as streams that flow through transformation pipelines. Instead of imperatively pulling data, you declaratively describe how data should be transformed as it arrives.

Think of a spreadsheet: when you change cell A1, every formula referencing A1 updates automatically. RxJS brings this model to JavaScript. DOM events, HTTP responses, WebSocket messages, timers, and any async source become streams you can filter, combine, and transform with a consistent API.

import { fromEvent } from 'rxjs';
import { map, filter, debounceTime } from 'rxjs/operators';

// Reactive: declare the data pipeline
const clicks$ = fromEvent(document, 'click').pipe(
  debounceTime(300),
  map(event => ({ x: event.clientX, y: event.clientY })),
  filter(pos => pos.x > 100)
);

// Subscribe to start receiving values
clicks$.subscribe(pos => console.log(pos));

2. Observables vs Promises

Both Observables and Promises handle asynchronous values, but they differ fundamentally. Understanding these differences is critical for choosing the right tool.

FeaturePromiseObservable
EmissionsSingle valueZero to many values
ExecutionEager (starts immediately)Lazy (starts on subscribe)
CancellationNot cancellableUnsubscribe to cancel
OperatorsLimited (.then, .catch, .finally)100+ composable operators
MulticastingShared by defaultUnicast by default, multicast via Subjects
Error handling.catch() or try/awaitcatchError, retry, retryWhen operators
Async/awaitNative supportUse firstValueFrom() or lastValueFrom()
// Promise: eager, single value
const promise = fetch('/api/data'); // starts immediately

// Observable: lazy, multiple values
import { interval } from 'rxjs';
const obs$ = interval(1000); // does nothing until subscribed
const sub = obs$.subscribe(n => console.log(n)); // 0, 1, 2, ...
sub.unsubscribe(); // cancels the stream

3. Creating Observables

RxJS provides many creation functions. Each wraps a different source into an Observable stream.

of and from

import { of, from } from 'rxjs';

// of: emit each argument as a separate value
of(1, 2, 3).subscribe(v => console.log(v));
// Output: 1, 2, 3

// from: convert iterable, array, or Promise to Observable
from([10, 20, 30]).subscribe(v => console.log(v));
// Output: 10, 20, 30

// from with Promise
from(fetch('/api/users').then(r => r.json()))
  .subscribe(data => console.log(data));

// from with iterable (generator)
function* gen() { yield 1; yield 2; yield 3; }
from(gen()).subscribe(v => console.log(v));

interval and timer

import { interval, timer } from 'rxjs';
import { take } from 'rxjs/operators';

// interval: emit 0, 1, 2, ... every N ms
interval(1000).pipe(take(5))
  .subscribe(n => console.log(n));
// Output: 0, 1, 2, 3, 4 (one per second)

// timer: wait, then optionally repeat
timer(2000).subscribe(() => console.log('2s delay'));

// timer with period: wait 1s, then emit every 500ms
timer(1000, 500).pipe(take(4))
  .subscribe(n => console.log(n));
// Output: 0 (at 1s), 1 (at 1.5s), 2 (at 2s), 3 (at 2.5s)

fromEvent

import { fromEvent } from 'rxjs';
import { map, throttleTime } from 'rxjs/operators';

// DOM click events
const clicks$ = fromEvent(document, 'click').pipe(
  throttleTime(1000),
  map(e => ({ x: e.clientX, y: e.clientY }))
);
clicks$.subscribe(pos => console.log(pos));

// Keyboard events
const keys$ = fromEvent<KeyboardEvent>(document, 'keydown').pipe(
  map(e => e.key)
);
keys$.subscribe(key => console.log('Pressed:', key));

Custom Observable with new Observable()

import { Observable } from 'rxjs';

const custom$ = new Observable(subscriber => {
  subscriber.next('Hello');
  subscriber.next('World');

  const id = setTimeout(() => {
    subscriber.next('Async value');
    subscriber.complete();
  }, 1000);

  // Teardown logic (called on unsubscribe)
  return () => clearTimeout(id);
});

custom$.subscribe({
  next: val => console.log(val),
  error: err => console.error(err),
  complete: () => console.log('Done')
});

4. Subjects: Subject, BehaviorSubject, ReplaySubject, AsyncSubject

A Subject is both an Observable and an Observer. It multicasts values to all subscribers. RxJS provides four types, each suited to different scenarios.

Subject

A plain Subject multicasts to current subscribers. Late subscribers miss previous emissions.

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe(v => console.log('A:', v));
subject.next(1); // A: 1
subject.next(2); // A: 2

subject.subscribe(v => console.log('B:', v));
subject.next(3); // A: 3, B: 3  (B missed 1 and 2)

BehaviorSubject

Requires an initial value and immediately emits the current value to new subscribers.

import { BehaviorSubject } from 'rxjs';

const state$ = new BehaviorSubject<string>('initial');

state$.subscribe(v => console.log('A:', v)); // A: initial
state$.next('updated'); // A: updated

state$.subscribe(v => console.log('B:', v)); // B: updated
console.log('Current:', state$.getValue()); // Current: updated

ReplaySubject

Replays a specified number of past emissions to new subscribers.

import { ReplaySubject } from 'rxjs';

const replay$ = new ReplaySubject<number>(2); // buffer last 2

replay$.next(1);
replay$.next(2);
replay$.next(3);

// Late subscriber receives 2 and 3 (last 2 values)
replay$.subscribe(v => console.log(v)); // 2, 3

// Time-bounded replay: buffer values from last 500ms
const timed$ = new ReplaySubject<number>(Infinity, 500);

AsyncSubject

Only emits the last value, and only when complete() is called.

import { AsyncSubject } from 'rxjs';

const async$ = new AsyncSubject<number>();
async$.subscribe(v => console.log(v));

async$.next(1); // nothing yet
async$.next(2); // nothing yet
async$.next(3); // nothing yet
async$.complete(); // Output: 3 (only the last value)

5. Essential Operators

Operators are the building blocks of RxJS pipelines. They transform, filter, combine, and manage Observable streams. Here are the most important ones.

Transformation: map, scan, reduce

import { of } from 'rxjs';
import { map, scan, reduce } from 'rxjs/operators';

// map: transform each value
of(1, 2, 3).pipe(
  map(x => x * 10)
).subscribe(v => console.log(v)); // 10, 20, 30

// scan: accumulate (emits each intermediate result)
of(1, 2, 3, 4, 5).pipe(
  scan((acc, val) => acc + val, 0)
).subscribe(v => console.log(v)); // 1, 3, 6, 10, 15

// reduce: accumulate (emits only final result)
of(1, 2, 3, 4, 5).pipe(
  reduce((acc, val) => acc + val, 0)
).subscribe(v => console.log(v)); // 15

Filtering: filter, distinctUntilChanged, debounceTime

import { from, fromEvent } from 'rxjs';
import { filter, distinctUntilChanged, debounceTime, map } from 'rxjs/operators';

// filter: pass values meeting a condition
from([1, 2, 3, 4, 5, 6]).pipe(
  filter(x => x % 2 === 0)
).subscribe(v => console.log(v)); // 2, 4, 6

// distinctUntilChanged: skip consecutive duplicates
from([1, 1, 2, 2, 3, 1]).pipe(
  distinctUntilChanged()
).subscribe(v => console.log(v)); // 1, 2, 3, 1

// debounceTime: wait for silence, then emit latest
fromEvent(input, 'input').pipe(
  map(e => e.target.value),
  debounceTime(300),
  distinctUntilChanged()
).subscribe(term => search(term));

Flattening: switchMap, mergeMap, concatMap, exhaustMap

These are the most important operators to understand. Each handles inner Observables differently.

import { fromEvent, interval } from 'rxjs';
import { switchMap, mergeMap, concatMap, exhaustMap, take } from 'rxjs/operators';

// switchMap: cancel previous inner, keep only latest
// Best for: typeahead search, route changes
searchInput$.pipe(
  switchMap(term => http.get('/search?q=' + term))
  // If user types fast, previous requests are cancelled
);

// mergeMap: run all inner Observables concurrently
// Best for: parallel HTTP requests, logging
fileList$.pipe(
  mergeMap(file => uploadFile(file), 3) // max 3 concurrent
);

// concatMap: queue inner Observables, run sequentially
// Best for: ordered operations, sequential writes
actions$.pipe(
  concatMap(action => saveToDatabase(action))
  // Each save completes before the next starts
);

// exhaustMap: ignore new emissions while inner is active
// Best for: login buttons, form submissions
loginBtn$.pipe(
  exhaustMap(() => http.post('/login', credentials))
  // Rapid clicks are ignored until request completes
);

Utility: tap, take, takeUntil

import { interval, Subject } from 'rxjs';
import { tap, take, takeUntil } from 'rxjs/operators';

// tap: side effects without modifying the stream
source$.pipe(
  tap(v => console.log('Before:', v)),
  map(v => v * 2),
  tap(v => console.log('After:', v))
);

// take: emit only the first N values
interval(1000).pipe(take(3))
  .subscribe(v => console.log(v)); // 0, 1, 2, then completes

// takeUntil: emit until notifier Observable emits
const destroy$ = new Subject<void>();
interval(100).pipe(
  takeUntil(destroy$)
).subscribe(v => console.log(v));

// Later: stop the stream
destroy$.next();
destroy$.complete();

6. Piping and Composition

The pipe() method chains operators into transformation pipelines. You can also create reusable custom operators.

import { pipe } from 'rxjs';
import { filter, map, tap } from 'rxjs/operators';

// Basic piping
source$.pipe(
  filter(x => x > 0),
  map(x => x * 2),
  tap(x => console.log(x))
);

// Custom reusable operator
function multiplyPositive(factor: number) {
  return pipe(
    filter((x: number) => x > 0),
    map((x: number) => x * factor)
  );
}

// Usage
of(-1, 2, -3, 4).pipe(
  multiplyPositive(10)
).subscribe(v => console.log(v)); // 20, 40

// Combining multiple streams
import { combineLatest, merge, forkJoin, zip } from 'rxjs';

// combineLatest: emit when any source emits
combineLatest([obs1$, obs2$]).subscribe(([a, b]) => {});

// forkJoin: wait for all to complete, emit last values
forkJoin([http1$, http2$]).subscribe(([res1, res2]) => {});

// merge: combine outputs into single stream
merge(clicks$, keys$, timer$).subscribe(event => {});

7. Error Handling

Errors in RxJS terminate the Observable stream by default. The catchError and retry operators provide composable error recovery.

import { of, throwError, timer } from 'rxjs';
import { catchError, retry, tap, switchMap } from 'rxjs/operators';

// catchError: intercept and recover from errors
http.get('/api/data').pipe(
  catchError(err => {
    console.error('Request failed:', err);
    return of({ data: [], fallback: true }); // return fallback
  })
);

// retry: re-subscribe N times on error
http.get('/api/data').pipe(
  retry(3), // retry up to 3 times
  catchError(err => of({ error: 'All retries failed' }))
);

// retry with delay (exponential backoff)
http.get('/api/data').pipe(
  retry({
    count: 3,
    delay: (error, retryCount) => {
      const delayMs = Math.pow(2, retryCount) * 1000;
      console.log('Retry ' + retryCount + ' in ' + delayMs + 'ms');
      return timer(delayMs);
    }
  }),
  catchError(err => of({ error: 'Final failure' }))
);

// Error in inner Observable (switchMap)
clicks$.pipe(
  switchMap(() =>
    http.get('/api/data').pipe(
      catchError(err => of(null)) // catch inside inner
    )
  )
);

8. Multicasting

By default, each subscription to an Observable creates a new execution. Multicasting shares a single execution across subscribers using share, shareReplay, or Subjects.

import { interval } from 'rxjs';
import { share, shareReplay, take } from 'rxjs/operators';

// Without sharing: each subscriber gets its own stream
const cold$ = interval(1000).pipe(take(3));
cold$.subscribe(v => console.log('A:', v)); // own timer
cold$.subscribe(v => console.log('B:', v)); // different timer

// share(): multicast to current subscribers
const shared$ = interval(1000).pipe(
  take(3),
  share()
);
shared$.subscribe(v => console.log('A:', v));
shared$.subscribe(v => console.log('B:', v));
// Both see the same values at the same time

// shareReplay(1): cache and replay latest to late subscribers
const data$ = http.get('/api/config').pipe(
  shareReplay(1) // cache the response
);
// First subscriber triggers the HTTP call
data$.subscribe(config => renderHeader(config));
// Second subscriber reuses the cached response
data$.subscribe(config => renderFooter(config));

9. Schedulers

Schedulers control when and where Observable emissions happen. They are rarely used directly, but understanding them is essential for testing and performance tuning.

import { of, asyncScheduler, asapScheduler, queueScheduler } from 'rxjs';
import { observeOn, subscribeOn } from 'rxjs/operators';

// asyncScheduler: uses setTimeout (macrotask)
console.log('Before');
of(1, 2, 3).pipe(
  observeOn(asyncScheduler)
).subscribe(v => console.log(v));
console.log('After');
// Output: Before, After, 1, 2, 3

// asapScheduler: uses Promise.resolve (microtask)
// Executes before setTimeout but after current synchronous code

// queueScheduler: synchronous, but uses a queue to prevent recursion

// subscribeOn: changes when subscribe() starts execution
source$.pipe(
  subscribeOn(asyncScheduler) // defer subscription to next macrotask
);

// observeOn: changes when notifications are delivered
source$.pipe(
  observeOn(asyncScheduler) // deliver values asynchronously
);

10. Testing with Marble Diagrams

Marble diagrams visually represent Observable timelines. The TestScheduler lets you write deterministic tests for async streams.

// Marble syntax:
// -     = one frame of time (10ms by default)
// a-z   = emitted value
// |     = complete
// #     = error
// ()    = synchronous grouping
// ^     = subscription point (hot only)
// !     = unsubscription point

import { TestScheduler } from 'rxjs/testing';
import { map, delay } from 'rxjs/operators';

describe('RxJS marble tests', () => {
  let scheduler: TestScheduler;

  beforeEach(() => {
    scheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });

  it('should map values', () => {
    scheduler.run(({ cold, expectObservable }) => {
      const source =  '--a--b--c|';
      const expected = '--x--y--z|';
      const values = { a: 1, b: 2, c: 3, x: 10, y: 20, z: 30 };

      const result = cold(source, values).pipe(
        map(v => v * 10)
      );
      expectObservable(result).toBe(expected, values);
    });
  });

  it('should test hot Observables', () => {
    scheduler.run(({ hot, expectObservable }) => {
      // ^ marks subscription, values before ^ are ignored
      const source =  '--a--^--b--c--|';
      const expected =       '--b--c--|';

      expectObservable(hot(source)).toBe(expected);
    });
  });
});

11. RxJS in Angular

Angular uses RxJS extensively. The HttpClient returns Observables, reactive forms expose value changes as streams, and the async pipe handles subscriptions automatically.

// Angular service with RxJS
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { BehaviorSubject } from 'rxjs';
import { switchMap, shareReplay, catchError } from 'rxjs/operators';

@Injectable({ providedIn: 'root' })
export class UserService {
  private refresh$ = new BehaviorSubject<void>(undefined);

  users$ = this.refresh$.pipe(
    switchMap(() => this.http.get<User[]>('/api/users')),
    shareReplay(1),
    catchError(err => {
      console.error(err);
      return of([]);
    })
  );

  constructor(private http: HttpClient) {}
  refresh() { this.refresh$.next(); }
}

// Component with async pipe (auto-subscribes/unsubscribes)
// @Component({
//   template: `
//     <ul>
//       <li *ngFor="let user of userService.users$ | async">
//         {{ user.name }}
//       </li>
//     </ul>
//   `
// })

// Angular 16+ takeUntilDestroyed
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

export class MyComponent {
  constructor() {
    interval(1000).pipe(
      takeUntilDestroyed() // auto-unsubscribes on destroy
    ).subscribe(v => console.log(v));
  }
}

12. RxJS with React

While React does not use RxJS natively, custom hooks make integration clean. The key is subscribing in useEffect and cleaning up on unmount.

import { useState, useEffect, useRef } from 'react';
import { BehaviorSubject, Subject } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

// Generic hook to subscribe to an Observable
function useObservable<T>(observable$: Observable<T>, initial: T): T {
  const [value, setValue] = useState<T>(initial);

  useEffect(() => {
    const sub = observable$.subscribe(setValue);
    return () => sub.unsubscribe(); // cleanup
  }, [observable$]);

  return value;
}

// Typeahead search hook
function useSearch() {
  const [results, setResults] = useState([]);
  const search$ = useRef(new Subject<string>()).current;

  useEffect(() => {
    const sub = search$.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      switchMap(term =>
        fetch('/api/search?q=' + term).then(r => r.json())
      )
    ).subscribe(setResults);

    return () => sub.unsubscribe();
  }, []);

  return { results, search: (term: string) => search$.next(term) };
}

13. Common Real-World Patterns

Typeahead Search

import { fromEvent } from 'rxjs';
import {
  map, debounceTime, distinctUntilChanged,
  switchMap, filter, catchError
} from 'rxjs/operators';

const input = document.getElementById('search');

const typeahead$ = fromEvent(input, 'input').pipe(
  map(e => e.target.value.trim()),
  filter(term => term.length >= 2),
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(term =>
    fetch('/api/search?q=' + encodeURIComponent(term))
      .then(r => r.json())
  ),
  catchError((err, source$) => {
    console.error(err);
    return source$; // re-subscribe to keep listening
  })
);

typeahead$.subscribe(results => renderResults(results));

Polling

import { timer, Subject } from 'rxjs';
import {
  switchMap, takeUntil, retry, shareReplay
} from 'rxjs/operators';

const stop$ = new Subject<void>();

// Poll every 5 seconds, starting immediately
const poll$ = timer(0, 5000).pipe(
  switchMap(() => fetch('/api/status').then(r => r.json())),
  retry(2),
  takeUntil(stop$),
  shareReplay(1)
);

poll$.subscribe(status => updateUI(status));

// Stop polling
// stop$.next(); stop$.complete();

Retry with Exponential Backoff

import { Observable, timer, throwError } from 'rxjs';
import { mergeMap, retryWhen, catchError } from 'rxjs/operators';

function fetchWithBackoff(url: string, maxRetries = 3) {
  return new Observable(subscriber => {
    fetch(url)
      .then(r => {
        if (!r.ok) throw new Error('HTTP ' + r.status);
        return r.json();
      })
      .then(data => {
        subscriber.next(data);
        subscriber.complete();
      })
      .catch(err => subscriber.error(err));
  }).pipe(
    retry({
      count: maxRetries,
      delay: (error, retryCount) => {
        const backoff = Math.pow(2, retryCount) * 1000;
        const jitter = Math.random() * 1000;
        return timer(backoff + jitter);
      }
    }),
    catchError(err => {
      console.error('All retries exhausted:', err);
      return throwError(() => err);
    })
  );
}

14. Preventing Memory Leaks

Forgetting to unsubscribe from long-lived Observables (intervals, WebSockets, fromEvent) is the most common RxJS mistake. Here are strategies for every context.

// Strategy 1: Manual unsubscribe
const sub = interval(1000).subscribe(v => console.log(v));
// later...
sub.unsubscribe();

// Strategy 2: takeUntil with destroy signal
const destroy$ = new Subject<void>();
interval(1000).pipe(
  takeUntil(destroy$)
).subscribe(v => console.log(v));
// On component destroy:
// destroy$.next(); destroy$.complete();

// Strategy 3: take / first / takeWhile
interval(1000).pipe(take(5)).subscribe(); // auto-completes after 5
source$.pipe(first()).subscribe(); // auto-completes after 1
source$.pipe(
  takeWhile(v => v < 100) // completes when condition is false
).subscribe();

// Strategy 4: Angular async pipe
// {{ data$ | async }} in template — auto-unsubscribes

// Strategy 5: React useEffect cleanup
// useEffect(() => {
//   const sub = obs$.subscribe(...);
//   return () => sub.unsubscribe();
// }, []);

// Strategy 6: Subscription bag
import { Subscription } from 'rxjs';
const bag = new Subscription();
bag.add(stream1$.subscribe(...));
bag.add(stream2$.subscribe(...));
// Unsubscribe all at once:
bag.unsubscribe();

15. Migration Tips

Migrating from callbacks or Promises to RxJS, or upgrading between RxJS versions, requires careful attention. Here are key tips.

From Promises to Observables

// Before: Promise chains
async function loadData() {
  const user = await fetch('/api/user').then(r => r.json());
  const posts = await fetch('/api/posts?userId=' + user.id)
    .then(r => r.json());
  return { user, posts };
}

// After: Observable pipeline
import { from, forkJoin } from 'rxjs';
import { switchMap, map } from 'rxjs/operators';

const data$ = from(fetch('/api/user').then(r => r.json())).pipe(
  switchMap(user =>
    from(fetch('/api/posts?userId=' + user.id).then(r => r.json())).pipe(
      map(posts => ({ user, posts }))
    )
  )
);

// Converting Observable to Promise (when needed)
import { firstValueFrom, lastValueFrom } from 'rxjs';

const result = await firstValueFrom(data$);
const last = await lastValueFrom(data$);

RxJS 6/7 to RxJS 7+ Key Changes

// 1. Import from 'rxjs' instead of deep paths
// Before (RxJS 5): import { map } from 'rxjs/operators/map';
// After (RxJS 7+): import { map } from 'rxjs/operators';
// Or:               import { map } from 'rxjs'; // tree-shakable

// 2. toPromise() is deprecated, use firstValueFrom/lastValueFrom
// Before: const val = await obs$.toPromise();
// After:  const val = await firstValueFrom(obs$);

// 3. subscribe with object (partial observer still supported)
source$.subscribe({
  next: val => console.log(val),
  error: err => console.error(err),
  complete: () => console.log('Done')
});

// 4. retry config object replaces retryWhen for most cases
// Before: retryWhen(errors => errors.pipe(delay(1000), take(3)))
// After:  retry({ count: 3, delay: 1000 })

// 5. shareReplay requires explicit refCount config
source$.pipe(
  shareReplay({ bufferSize: 1, refCount: true })
);

16. Operator Quick Reference

OperatorCategoryUse Case
mapTransformTransform each emitted value
filterFilterEmit only values passing a condition
switchMapFlattenCancel previous inner, use latest (search, routing)
mergeMapFlattenRun inner Observables concurrently (parallel IO)
concatMapFlattenQueue inner Observables sequentially (ordered writes)
exhaustMapFlattenIgnore new emissions while inner is active (form submit)
debounceTimeFilterWait for silence before emitting (input fields)
distinctUntilChangedFilterSkip consecutive duplicate values
catchErrorErrorIntercept errors and return fallback Observable
retryErrorRe-subscribe on error N times
tapUtilitySide effects without modifying the stream
takeFilterEmit first N values then complete
takeUntilFilterEmit until notifier Observable emits (unsubscribe pattern)
scanTransformAccumulate values over time (running total)
shareReplayMulticastShare execution and replay N values to late subscribers

17. Frequently Asked Questions

What is RxJS and why should I use it?

RxJS (Reactive Extensions for JavaScript) is a library for composing asynchronous and event-based programs using Observable sequences. It provides powerful operators to transform, combine, and manage data streams. Use it when dealing with complex async workflows, real-time data, event coordination, or when you need cancellation and backpressure support that Promises cannot provide.

What is the difference between an Observable and a Promise?

A Promise emits a single value and is eagerly executed — it starts immediately upon creation. An Observable can emit zero, one, or many values over time and is lazy — it only executes when subscribed to. Observables also support cancellation via unsubscribe(), while Promises cannot be cancelled once started.

When should I use switchMap vs mergeMap vs concatMap?

Use switchMap when you only care about the latest inner Observable (typeahead search, route changes). Use mergeMap when you want all inner Observables to run concurrently (parallel HTTP requests). Use concatMap when order matters and you want inner Observables to execute sequentially (file uploads, ordered API calls).

How do I prevent memory leaks with RxJS subscriptions?

Always unsubscribe from long-lived Observables. In Angular use the async pipe or takeUntilDestroyed(). In React, unsubscribe in the useEffect cleanup function. You can also use operators like take(n), takeUntil(), first(), or takeWhile() to auto-complete subscriptions.

What is a Subject in RxJS and when should I use it?

A Subject is both an Observable and an Observer — it can multicast values to multiple subscribers and accept values via next(). Use Subject for event buses, BehaviorSubject when you need an initial/current value, ReplaySubject to replay past emissions to late subscribers, and AsyncSubject when you only want the last value before completion.

How does error handling work in RxJS?

Use the catchError operator in a pipe to intercept errors and return a fallback Observable. The retry operator re-subscribes to the source on error. For conditional retry logic, use retry({ count, delay }) with exponential backoff. Errors terminate the Observable by default unless caught.

What are marble diagrams and how do I test with them?

Marble diagrams are a visual syntax for representing Observable sequences over time. In testing, you use the TestScheduler from rxjs/testing to create hot and cold Observables using marble strings like "--a--b--c|" where dashes are time frames, letters are emissions, and "|" is completion. This enables deterministic, synchronous testing of async streams.

Can I use RxJS with React or is it only for Angular?

RxJS works with any JavaScript framework or vanilla JS. While Angular has built-in RxJS integration through HttpClient and reactive forms, you can use RxJS in React with custom hooks, in Vue with composition API, in Node.js for stream processing, or in plain browser code for DOM event handling.

Conclusion

RxJS transforms how you think about asynchronous programming. Instead of managing callbacks, Promise chains, and manual state, you compose declarative pipelines that handle timing, concurrency, errors, and cancellation in a unified model. Start with the core operators (map, filter, switchMap, catchError, takeUntil) and expand to more specialized operators as your needs grow. Regardless of whether you use Angular, React, or plain JavaScript, the reactive patterns in this guide apply everywhere.

The most important habit to build is always managing subscriptions. Use takeUntil or the framework-specific cleanup patterns (async pipe in Angular, useEffect cleanup in React) to prevent memory leaks. Combine this with shareReplay for caching, marble testing for reliability, and error-handling pipelines for resilience, and you will have a robust reactive architecture.

𝕏 Twitterin LinkedIn
¿Fue útil?

Mantente actualizado

Recibe consejos de desarrollo y nuevas herramientas.

Sin spam. Cancela cuando quieras.

Prueba estas herramientas relacionadas

{ }JSON FormatterJSON Validator

Artículos relacionados

Guía Angular: Componentes, Servicios, RxJS, NgRx y Signals Angular 17+

Domina Angular. Componentes y data binding, directivas, inyección de dependencias, formularios reactivos, RxJS, Router con lazy loading, NgRx y Signals Angular 17.

Promises JavaScript y Async/Await: Guía Completa

Domina Promises y async/await: creación, encadenamiento, Promise.all y manejo de errores.

Guia completa de genericos TypeScript 2026: de lo basico a patrones avanzados

Domina los genericos de TypeScript: parametros de tipo, restricciones, tipos condicionales, tipos mapeados, tipos utilitarios y patrones del mundo real.