import { Observable } from 'rxjs';
import { filter, mergeMap, take } from 'rxjs/operators';

import { AbstractWebsocketWatcher } from './abstract-websocket-watcher';
import { WebsocketPusherConnectionService } from '../websocket-pusher-connection.service';

export class PusherWebsocketWatcher extends AbstractWebsocketWatcher {
  private readonly COMMON_EVENT_NAME: string = 'common';

  constructor() {
    super();
  }

  public watchUserTopic<T>(destination: string): Observable<T> {
    return WebsocketPusherConnectionService.userChannel$.pipe(
      filter(userChannel => !!userChannel),
      take(1),
      mergeMap(userChannel => {
        return new Observable<T>(subscriber => {
          userChannel.bind(`-topic-${destination}`, payload => {
            subscriber.next(payload);
          });
        });
      })
    );
  }

  public watchTopic<T>(destination: string): Observable<T> {
    return WebsocketPusherConnectionService.pusher$.pipe(
      filter(pusher => !!pusher),
      take(1),
      mergeMap(pusher => {
        return new Observable<T>(subscriber => {
          const channel = pusher.subscribe(`private-topic-${destination}`);
          channel.bind(this.COMMON_EVENT_NAME, payload => {
            subscriber.next(payload);
          });
          return () => {
            channel.unsubscribe();
          };
        });
      })
    );
  }
}
