Some tips on observables

Posted on 2025-06-29 in Trucs et astuces

You can download the Jupiter notebook or download the script.

Subscribing to the same observable twice

The source will be consumed twice. To prevent that, a Subject must be used instead.

const sourceMultipleSubs = from([1, 2]);

sourceMultipleSubs.subscribe(value => console.log("Subscription 1", value));
sourceMultipleSubs.subscribe(value => console.log("Subscription 2", value));

Various behaviors

const source$ = from([1, 2, 3, 1, 4]);
// Will emit the values of source$ each 3,9 seconds. The 1st value is emitted immediatly.
const delayedValues = zip(source$, timer(0, 3_900), (value, time) => value);

Behavior of from(Promise) and combineLatest

The promise merged with from will never be called again when used in combineLatest. It will only ever use its first value.

const someHttpRequest = () => {
    console.log('calling the function');
    return Promise.resolve(null);
}

const sub = combineLatest(interval(2_000).pipe(take(10)), from(someHttpRequest())).subscribe(console.log)

Behavior of switchMap

When multiple switchMap are chained, the observable created by the latest switchMap will continue to emit values until we pass again in this switch map.

const sub = delayedValues.pipe(
  tap((value) => console.log('Source value', value)),
  switchMap(value => interval(1000).pipe(
      filter(value => value <= 2),
      take(1),
      map(() => value * 10),
      tap(value => console.log('1st inner observable emmiting value:', value)),
      finalize(() => console.log('Ending 1st inner observable created from:', value)),
    )
  ),
  switchMap(value => interval(500).pipe(
      take(10),
      map(() => value * 10),
      tap(value => console.log('2nd inner observable emmiting value:', value)),
      finalize(() => console.log('Ending 2nd inner observable created from:', value)),
    )
  )
).subscribe(value => console.log('Value in subscription:', value), console.error, () => console.log('Ending subscription'));

Behavior of filter

Filter won’t trigger the rest of the pipeline until it accepts a value. This also means observable created by switchMap will continue to execute until a new value passes the filter.

const sub = delayedValues.pipe(
  tap((value) => console.log('Source value', value)),
  filter(value => value <= 2),
  switchMap(value => interval(1000).pipe(
      take(5),
      map(() => value * 10),
      finalize(() => console.log('Ending inner observable created from:', value))
    )
  ),
).subscribe(value => console.log('Value in subscription:', value), console.error, () => console.log('Ending subscription'));

Using EMPTY in a switchMap

The observable will complete as soon as the switch to empty is done. Must use of({}) instead to only cancel a previous switchMap.

const obs1 = from([1, 2])
    .pipe(switchMap(() => EMPTY))
    .subscribe(
        () => console.log('Next with EMPTY'),
        () => {},
        () => console.log('Complete EMPTY')
    );


const obs2 = from([1, 2])
    .pipe(switchMap(() => of({})))
    .subscribe(
        () => console.log('Next of({})'),
        () => {},
        () => console.log('Complete of({})')
    );

Behavior of switchMap with Promise

Promise can’t be canceled. When reaching a switchMap again, RxJS will just start the new promise. This means, many Promises can execute in parallel. The rest of the pipe will only be executed for promise that succeeds before switchMap is called again. So, if switchMap is called a second time while the initial is still running, only the second call will execute the full pipe.

With mergeMap, the rest of the pipe will be executed for each promise that resolves.

With contactMap, we will execute concatMap in order and wait for the previous one to complete fully (ie the full pipe that follows) before starting the next one. The rest of the pipe is executed as expected. This is the same behavior than with observables.

const sub = delayedValues.pipe(
  tap((value) => console.log('Source value', value)),
  switchMap(value => {
      console.log("Entering switchMap", value);
      return new Promise(resolve => setTimeout(() => {
          console.log("Resolving promise", value);
          resolve(value * 10);
      }, 10000));
  }),
  tap(value => console.log("Value in pipe", value)),
).subscribe(value => console.log('Value in subscription:', value), console.error, () => console.log('Ending subscription'));
const sub = delayedValues.pipe(
  tap((value) => console.log('Source value', value)),
  mergeMap(value => {
      console.log("Entering mergeMap", value);
      return new Promise(resolve => setTimeout(() => {
          console.log("Resolving promise", value);
          resolve(value * 10);
      }, 10000));
  }),
  tap(value => console.log("Value in pipe", value)),
).subscribe(value => console.log('Value in subscription:', value), console.error, () => console.log('Ending subscription'));
const sub = delayedValues.pipe(
  tap((value) => console.log('Source value', value)),
  concatMap(value => {
      console.log("Entering concatMap", value);
      return new Promise(resolve => setTimeout(() => {
          console.log("Resolving promise", value);
          resolve(value * 10);
      }, 10000));
  }),
  tap(value => console.log("Value in pipe", value)),
).subscribe(value => console.log('Value in subscription:', value), console.error, () => console.log('Ending subscription'));

If the Promise is resolved immediately, the observable will emit once with the last value from the promise and then end. So the chain can end prematurely if a source should continue to emit and Promise is used in the chain.

mergeMap and concatMap will execute the chain for all emitted values from the source.

const obs = from([1, 2])
    .pipe(switchMap(val => from(Promise.resolve(val * 10))))
    .subscribe(
        val => console.log('Next:', val),
        () => {},
        val => console.log('Completed with:', val)
    );
const obs = from([1, 2])
    .pipe(mergeMap(val => from(Promise.resolve(val * 10))))
    .subscribe(
        val => console.log('Next:', val),
        () => {},
        val => console.log('Completed with:', val)
    );
const obs = from([1, 2])
    .pipe(concatMap(val => from(Promise.resolve(val * 10))))
    .subscribe(
        val => console.log('Next:', val),
        () => {},
        val => console.log('Completed with:', val)
    );

Behavior of catchError and finalize

  • No matter where the finalize is, it will run! It will run after the error callback of the subscription. If there’s no error, it will run before the next callback of the subscription.
  • If we have multiple finalize, they will run in the order of definition, as part of the pipe.
  • Having a catchError prevents the error callback of the subscription to be run.
    • It’s like a catch on a Promise chain: we revert to a normal flow. Using throwError cancels that flow.
    • It will only catch errors that occurs before its definition. It has no impact on errors that occurs after. So it must be placed as far in the pipe as possible to catch as many errors as possible.
const obs = source$.pipe(
    map(value => {
        if (value <= 2) {
            return value
        }

        throw new Error("oops")
    }),
    tap(value => console.log('Tapping:', value)),
    finalize(() => console.log('running finalize')),
).subscribe()
const obs = source$.pipe(
    finalize(() => console.log('running finalize')),
    map(value => {
        if (value <= 2) {
            return value
        }

        throw new Error("oops")
    }),
    catchError(() => {
        console.log('An error occurred');
        return of("Kaboom");
    }),
    tap(value => console.log('Tapping:', value)),
).subscribe(console.log, v => console.warn('On error', v))
const obs = source$.pipe(
    finalize(() => console.log('running finalize')),
    map(value => {
        if (value <= 2) {
            return value
        }

        throw new Error("oops")
    }),
    catchError(() => {
        console.log('An error occurred');
        return throwError(() => new Error("Kaboom"));
    }),
    tap(value => console.log('Tapping:', value)),
).subscribe(console.log, v => console.warn('On error', v))
const obs = source$.pipe(
    finalize(() => console.log('running finalize 1')),
    catchError(() => {
        console.log('An error occurred');
        return of("Kaboom");
    }),
    map(value => {
        if (value <= 2) {
            return value
        }

        throw new Error("oops")
    }),
    tap(value => console.log('Tapping:', value)),
    finalize(() => console.log('running finalize 2')),
).subscribe(console.log, v => console.warn('On error', v))
const obs = source$.pipe(
    finalize(() => console.log('running finalize 1')),
    map(value => {
        if (value <= 2) {
            return value
        }

        throw new Error("oops")
    }),
    catchError(() => {
        console.log('An error occurred');
        return of("Kaboom");
    }),
    tap(value => console.log('Tapping:', value)),
    finalize(() => console.log('running finalize 2')),
).subscribe(console.log, v => console.warn('On error', v))

Switching to an observable that uses switchMap internally

The inner observable (and thus its inner switchMap) is correctly unsubscribed from and thus does nothing when it must do nothing.

const buildInnerObs = startingValue => delayedValues.pipe(
    map(value => startingValue * 10),
    tap(value => console.log('Inner obs:', value)),
    switchMap(value => interval(1000).pipe(
        take(10),
        map(() => value * 10),
        finalize(() => console.log('Ending inner interval')))
    ),
    tap(value => console.log('Inner obs switch map:', value)),
    finalize(() => console.log('Ending inner switchMap')),
);

const sub = delayedValues.pipe(
    tap(value => console.log('Starting with:', value)),
    switchMap(value => value <= 2 ? buildInnerObs(value) : of({})),
).subscribe(value => console.log('In subscription:', value));

combineLatest

It will wait for all observable to emit their first values before emitting.

const sub = combineLatest([delayedValues, source$.pipe(delay(1000))]).subscribe(console.log);