import { Observable, ReplaySubject, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

interface CancellationSignals {
  [key: string]: Subject<any>;
}

export abstract class CancellableService {
  private cancellationSignals: CancellationSignals = {};

  public cancelPendingRequests(id: string): void {
    if (!this.cancellationSignals[id]) {
      return;
    }

    this.cancellationSignals[id].next(null);
    this.cancellationSignals[id].complete();
  }

  protected getCancellationSignal(id: string): Subject<any> {
    return this.cancellationSignals[id];
  }

  protected withCancellation(id: string, observable: Observable<any>): Observable<any> {
    if (!this.getCancellationSignal(id)) {
      this.addCancellationSignal(id);
    }

    return observable.pipe(takeUntil(this.cancellationSignals[id]));
  }

  private addCancellationSignal(id: string): void {
    this.cancellationSignals[id] = new ReplaySubject(1);
  }
}
