Skip to content

Fix: RxJS Not Working — Observable Not Emitting, Memory Leak from Unsubscribed Stream, or Operator Behaving Unexpectedly

FixDevs ·

Quick Answer

How to fix RxJS issues — subscription management, switchMap vs mergeMap vs concatMap, error handling with catchError, Subject types, cold vs hot observables, and Angular async pipe.

The Problem

An observable emits values but the subscriber never receives them:

const subject = new Subject<number>();

subject.subscribe(val => console.log(val));  // Never logs

// ... elsewhere in the code
subject.next(1);  // Subject was completed before this

Or a memory leak because subscriptions aren’t cleaned up:

// Angular component
ngOnInit() {
  this.userService.getUser().subscribe(user => {
    this.user = user;  // Subscription lives forever — even after component destroys
  });
}

Or switchMap drops requests unexpectedly:

this.searchInput.valueChanges.pipe(
  switchMap(query => this.api.search(query)),
).subscribe(results => this.results = results);
// Fast typing cancels previous requests — some results are lost

Or an error in one stream kills the entire observable chain:

this.data$.subscribe({
  error: err => console.error(err),
});
// After one error, the observable completes — never emits again

Why This Happens

RxJS has nuanced behavior around subscription lifecycle and operator semantics:

  • Completed or errored observables never emit again — once a Subject is completed (via complete()) or throws an unhandled error, all future next() calls are silently ignored. You must create a new Subject or handle errors before they reach the subscriber.
  • Every subscribe() call creates a new subscription — subscriptions don’t clean themselves up. In Angular components or React effects, subscriptions created in lifecycle methods must be explicitly unsubscribed when the component destroys or the effect cleans up.
  • switchMap cancels the previous inner observable — by design. Use mergeMap if you want all requests to complete, concatMap if order matters, or exhaustMap if you want to ignore new events while one is in progress.
  • Errors propagate downstream and terminate the stream — the default RxJS error handling terminates the observable. Use catchError inside the pipe, before the subscriber, to recover.

Fix 1: Manage Subscriptions to Prevent Memory Leaks

// Angular — traditional unsubscribe pattern
import { Component, OnInit, OnDestroy } from '@angular/core';
import { Subscription } from 'rxjs';

@Component({ ... })
export class UserComponent implements OnInit, OnDestroy {
  private subscription = new Subscription();

  ngOnInit() {
    this.subscription.add(
      this.userService.getUser().subscribe(user => this.user = user)
    );
    this.subscription.add(
      this.searchService.getResults().subscribe(r => this.results = r)
    );
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();  // Cancels all added subscriptions
  }
}

// Angular — modern approach with takeUntilDestroyed (Angular 16+)
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DestroyRef, inject } from '@angular/core';

@Component({ ... })
export class UserComponent {
  private destroyRef = inject(DestroyRef);

  ngOnInit() {
    this.userService.getUser()
      .pipe(takeUntilDestroyed(this.destroyRef))
      .subscribe(user => this.user = user);
  }
}

// Angular — async pipe (best approach — auto-unsubscribes)
// template:
// <div *ngIf="user$ | async as user">{{ user.name }}</div>
user$ = this.userService.getUser();

// React — useEffect cleanup
useEffect(() => {
  const subscription = observable$.subscribe(value => setValue(value));
  return () => subscription.unsubscribe();  // Cleanup on unmount
}, []);

// Using take(1) for one-shot subscriptions
this.userService.getUser()
  .pipe(take(1))  // Auto-completes after first emission
  .subscribe(user => this.user = user);

Fix 2: Choose the Right Flattening Operator

All four operators handle inner observables differently:

import { switchMap, mergeMap, concatMap, exhaustMap } from 'rxjs/operators';

// switchMap — cancels previous inner observable when new outer value arrives
// Use for: search autocomplete, navigation, latest-value-wins scenarios
this.searchInput.valueChanges.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(query => this.api.search(query)),
  // If user types fast, only the last request completes
).subscribe(results => this.results = results);

// mergeMap (flatMap) — all inner observables run concurrently, no cancellation
// Use for: parallel requests where all results matter
const userIds = [1, 2, 3, 4, 5];
from(userIds).pipe(
  mergeMap(id => this.api.getUser(id)),  // All 5 requests fire at once
).subscribe(user => this.allUsers.push(user));

// concatMap — processes inner observables one by one, in order
// Use for: sequential operations, file uploads, ordered processing
from(filesToUpload).pipe(
  concatMap(file => this.api.upload(file)),  // Uploads one at a time, in order
).subscribe(result => this.uploadResults.push(result));

// exhaustMap — ignores new outer values while inner is active
// Use for: form submit buttons, preventing double-submit
this.submitButton.clicks.pipe(
  exhaustMap(() => this.api.submitForm(this.form)),
  // Ignores additional clicks while submit is pending
).subscribe(response => this.handleResponse(response));

Fix 3: Handle Errors Without Terminating the Stream

import { catchError, retry, retryWhen, EMPTY, of, throwError } from 'rxjs';
import { delay, take } from 'rxjs/operators';

// catchError — recover and continue
this.api.getUsers().pipe(
  catchError(err => {
    console.error('Failed to get users:', err);
    return of([]);  // Return empty array as fallback
    // return EMPTY;  // Complete without emitting
    // return throwError(() => err);  // Re-throw to propagate
  }),
).subscribe(users => this.users = users);

// Keep the stream alive after errors (for long-lived streams)
this.websocket.messages$.pipe(
  switchMap(msg => this.processMessage(msg).pipe(
    catchError(err => {
      console.error('Processing failed:', err);
      return EMPTY;  // Skip this message, continue listening
    })
  )),
).subscribe(result => this.handleResult(result));

// retry — retry on error
this.api.fetchData().pipe(
  retry(3),  // Retry up to 3 times before propagating error
).subscribe(data => this.data = data);

// Retry with exponential backoff
this.api.fetchData().pipe(
  retryWhen(errors => errors.pipe(
    delay(1000),  // Wait 1 second between retries
    take(3),      // Give up after 3 retries
  )),
).subscribe(data => this.data = data);

// Or use retry with config (RxJS 7+)
this.api.fetchData().pipe(
  retry({
    count: 3,
    delay: (error, retryCount) => timer(retryCount * 1000),  // Exponential backoff
  }),
).subscribe();

Fix 4: Understand Cold vs Hot Observables

import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject, share, shareReplay } from 'rxjs';

// COLD observable — each subscriber gets its own execution
// HTTP calls are cold — each subscribe triggers a new request
const users$ = this.http.get('/api/users');
users$.subscribe(u => console.log('Sub 1:', u));
users$.subscribe(u => console.log('Sub 2:', u));
// Two HTTP requests fired!

// HOT observable — all subscribers share the same execution
// Make cold observables hot with share() or shareReplay()
const sharedUsers$ = this.http.get('/api/users').pipe(
  shareReplay(1),  // Cache last emission — new subscribers get it immediately
);
sharedUsers$.subscribe(u => console.log('Sub 1:', u));
sharedUsers$.subscribe(u => console.log('Sub 2:', u));
// Only ONE HTTP request, both subscribers receive the result

// Subject — manually push values, hot
const events$ = new Subject<Event>();
events$.subscribe(e => console.log('Subscriber 1:', e));
events$.next(new Event('click'));  // Both subscribers receive it
events$.subscribe(e => console.log('Subscriber 2:', e));
events$.next(new Event('click'));  // Only Subscriber 2 receives this one

// BehaviorSubject — has a current value, new subscribers get latest emission
const currentUser$ = new BehaviorSubject<User | null>(null);
currentUser$.value;  // Access current value synchronously
currentUser$.subscribe(u => console.log(u));  // Immediately receives null
currentUser$.next(loggedInUser);  // All subscribers get the new user

// ReplaySubject — replays N emissions to new subscribers
const recentActions$ = new ReplaySubject<Action>(5);  // Replay last 5
recentActions$.next({ type: 'LOAD' });
recentActions$.next({ type: 'SUCCESS' });
// New subscriber receives both LOAD and SUCCESS immediately

// AsyncSubject — only emits the last value, on complete
const result$ = new AsyncSubject<number>();
result$.next(1);
result$.next(2);
result$.next(3);
result$.complete();  // Now emits 3 to all subscribers

Fix 5: Common Operators and Patterns

import {
  map, filter, tap, take, skip, distinctUntilChanged,
  debounceTime, throttleTime, startWith, pairwise,
  combineLatest, forkJoin, merge, zip, withLatestFrom,
  scan, reduce, buffer, window, groupBy,
} from 'rxjs';

// Transform and filter
source$.pipe(
  map(x => x * 2),
  filter(x => x > 10),
  tap(x => console.log('Value:', x)),  // Side effect without transforming
  take(5),                              // Take only first 5 values
  distinctUntilChanged(),               // Skip duplicate consecutive values
);

// Rate limiting
searchInput$.pipe(
  debounceTime(300),          // Emit only after 300ms of silence
  throttleTime(1000),         // Emit at most once per second
);

// Combining streams
combineLatest([user$, settings$]).pipe(
  map(([user, settings]) => ({ user, settings })),
);
// Emits when ANY input emits, with latest values from all

forkJoin([
  this.api.getUser(id),
  this.api.getPosts(id),
]).pipe(
  map(([user, posts]) => ({ user, posts })),
);
// Emits once when ALL observables complete (like Promise.all)

withLatestFrom(currentFilter$).pipe(
  map(([data, filter]) => applyFilter(data, filter)),
);
// Get latest value from secondary stream without subscribing to it

// Accumulate state
actions$.pipe(
  scan((state, action) => reducer(state, action), initialState),
);
// Like Array.reduce, but for streams

// Previous and current values
values$.pipe(
  startWith(null),
  pairwise(),  // Emits [previous, current]
  map(([prev, curr]) => ({ prev, curr, changed: prev !== curr })),
);

Fix 6: Angular Async Pipe Best Practices

// AVOID: manual subscription in component
@Component({ ... })
class UserComponent implements OnInit, OnDestroy {
  user: User;
  private sub: Subscription;

  ngOnInit() {
    this.sub = this.userService.getUser(this.id).subscribe(u => this.user = u);
  }

  ngOnDestroy() {
    this.sub.unsubscribe();
  }
}

// PREFER: async pipe in template (auto-unsubscribes, triggers change detection)
@Component({
  template: `
    <div *ngIf="user$ | async as user; else loading">
      <h1>{{ user.name }}</h1>
    </div>
    <ng-template #loading>Loading...</ng-template>
  `,
  changeDetection: ChangeDetectionStrategy.OnPush,
})
class UserComponent {
  user$ = this.userService.getUser(this.id);
}

// Combine multiple streams in template
@Component({
  template: `
    <ng-container *ngIf="{
      user: user$ | async,
      posts: posts$ | async,
      loading: loading$ | async
    } as vm">
      <app-user [user]="vm.user" [posts]="vm.posts" />
      <app-spinner *ngIf="vm.loading" />
    </ng-container>
  `,
})
class DashboardComponent {
  user$ = this.userService.getUser();
  posts$ = this.postsService.getPosts();
  loading$ = combineLatest([this.user$, this.posts$]).pipe(
    map(() => false),
    startWith(true),
  );
}

Still Not Working?

Observable emits but async pipe doesn’t update the view — in OnPush change detection mode, Angular only checks for changes when: (1) an input property changes, (2) an event from the component or its children occurs, (3) ChangeDetectorRef.markForCheck() is called, or (4) an async pipe resolves. If you’re using async pipe and OnPush, this should work automatically. If it doesn’t, check that you’re not sharing the observable reference across multiple places without shareReplay.

Subject.next() called before any subscriber — values emitted before a subscriber exists are lost on a Subject. Use BehaviorSubject (replays last value) or ReplaySubject (replays N values) if late subscribers need previous emissions. This is the cold/hot distinction in practice.

forkJoin never emitsforkJoin waits for all inner observables to complete. If any observable is an infinite stream (like BehaviorSubject), it never completes, so forkJoin never emits. Use combineLatest for streams that don’t complete, or add take(1) to force completion:

forkJoin([
  user$.pipe(take(1)),     // Force completion
  settings$.pipe(take(1)),
]).subscribe(([user, settings]) => { ... });

For related Angular issues, see Fix: Angular RxJS Memory Leak and Fix: Angular Signals Not Updating.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles