import {Inject, Injectable, OnDestroy} from '@angular/core';
import {EMPTY, Observable, Subject} from 'rxjs';
import {catchError, debounceTime, delayWhen, filter, mapTo, takeUntil} from 'rxjs/operators';
import {AppConfigService} from 'app/app/config';
import {EventSourcePolyfill, OnMessageEvent} from 'ng-event-source';
import {AuthService} from '@matchsource/authentication';
import {ApiConfig, ApiConfigService} from '@matchsource/api-config';
import {IdleService} from 'app/core/services/idle.service';

export enum ServerEvent {
  FeatureToggle = 'FEATURE_TOGGLE_UPDATED',
  RoleSwitch = 'ROLE_SWITCHED',
}

export class ServerSourceEvent {
  eventType: ServerEvent;
  payload: any;
}

class ServerSourceEventConnection {
  readonly source$: Observable<ServerSourceEvent>;

  private source: EventSourcePolyfill;

  private closeRequested = false;

  constructor(url: string, token: string, onOpen: () => void) {
    this.source$ = new Observable<ServerSourceEvent>(observer => {
      this.source = new EventSourcePolyfill(url, {
        headers: {
          Authorization: `Bearer ${token}`,
        },
      } as any);
      this.source.onmessage = (event: OnMessageEvent) => {
        const data = JSON.parse(event.data) as ServerSourceEvent;
        observer.next(data);
      };

      if (onOpen) {
        this.source.onopen = () => onOpen();
      }

      this.source.onerror = () => {
        if (this.closeRequested) {
          observer.complete();
        } else {
          this.close();
          observer.error();
          observer.complete();
        }
      };
    });
  }

  close() {
    this.closeRequested = true;
    this.source.close();
  }

  closed() {
    return this.source.readyState === EventSource.CLOSED;
  }

  asObservable() {
    return this.source$;
  }
}

@Injectable({
  providedIn: 'root',
})
export class ServerSourceEventService implements OnDestroy {
  connection$: Subject<ServerSourceEvent> = new Subject();

  featureToggle$: Observable<void>;

  roleSwitch$: Observable<string>;

  private connection: ServerSourceEventConnection;

  private readonly reconnect$: Subject<void> = new Subject();

  private readonly destroy$ = new Subject<void>();

  private connectionAttempt = 0;

  constructor(
    @Inject(ApiConfigService) private readonly apiConfig: ApiConfig,
    @Inject(AppConfigService) private readonly appConfig: MsAppConfig,
    private readonly auth: AuthService,
    private readonly idle: IdleService
  ) {
    this.reconnect$
      .pipe(
        debounceTime(this.appConfig.sse.reconnectDebounce),
        delayWhen(() => this.idle.active$.pipe(filter(active => active))),
        takeUntil(this.destroy$)
      )
      .subscribe(() => {
        this.connect();
      });

    this.featureToggle$ = this.connection$.pipe(
      filter((e: ServerSourceEvent) => e.eventType === ServerEvent.FeatureToggle),
      mapTo(undefined)
    );

    this.roleSwitch$ = this.connection$.pipe(
      filter((e: ServerSourceEvent) => e.eventType === ServerEvent.RoleSwitch),
      mapTo(undefined)
    );
  }

  async connect() {
    if (this.connectionAttempt === this.appConfig.sse.connectionAttempts) {
      return;
    }
    this.connectionAttempt++;

    if (this.connection && !this.connection.closed()) {
      throw new Error('Source is already connected. Call close method to close current connection.');
    }

    this.connection = new ServerSourceEventConnection(
      `${this.apiConfig.commonServiceUrl}sse/events-watcher`,
      await this.auth.getAccessToken(),
      () => (this.connectionAttempt = 0)
    );
    this.connection
      .asObservable()
      .pipe(
        catchError(() => {
          this.reconnect$.next();
          return EMPTY;
        }),
        takeUntil(this.destroy$)
      )
      .subscribe((e: ServerSourceEvent) => this.connection$.next(e));
  }

  close() {
    this.connection.close();
  }

  ngOnDestroy(): void {
    this.destroy$.next();
    this.destroy$.complete();

    this.close();
    this.connection$.complete();
  }
}
