import { Injectable } from '@angular/core';
import { RxStompState, StompHeaders } from '@stomp/rx-stomp';
import { Observable, Subject, merge, throwError } from 'rxjs';
import { map, takeUntil } from 'rxjs/operators';
import { isNotEmptyObject } from '@shared/base/core';
import { LocalStorageConstants } from '@shared/constants/local-storage-constants';
import { AuthService } from '@shared/helpers/auth.service';
import { SettingsState } from '../states/settings.state';
import { RxStompService } from './rx-stomp.service';

@Injectable({
  providedIn: 'root',
})
export class WebSocketService {
  private stopListen$ = new Subject<void>();

  constructor(
    private auth: AuthService,
    private settingsState: SettingsState,
    private stompService: RxStompService,
  ) {
    this.init();
  }

  private init(): void {
    this.stompService.configure({
      brokerURL: this.settingsState.wsPath,
      connectHeaders: {
        [LocalStorageConstants.TokenHeader]: localStorage.getItem(LocalStorageConstants.Token),
      },
      heartbeatIncoming: 10000,
      heartbeatOutgoing: 10000,
      reconnectDelay: 500,
      discardWebsocketOnCommFailure: true,
      /**
       * Дебагер
       * @param msg
       */
      // debug: (msg: string): void => {
      //   console.log(`WS_DEBUG: `, msg);
      // },
    });

    this.auth.loggedIn$.subscribe((logged) => {
      if (!logged) {
        this.stopListen$.next();
        this.stompService.deactivate().then();
      } else {
        this.stompService.activate();
        this.listenErrorsChannel();
      }
    });

    const statesEnum = Object.entries(RxStompState);
    const statesToNameMap = statesEnum.splice(statesEnum.length / 2, statesEnum.length).reduce(
      (result, item) => {
        result[item[1]] = item[0];

        return result;
      },
      {} as Record<RxStompState, string>,
    );

    this.stompService.connectionState$
      .pipe(takeUntil(this.stopListen$))
      .subscribe((connectionState) => {
        // eslint-disable-next-line no-console
        console.log(`WS_LOG: connectionState`, statesToNameMap[connectionState]);
      });
  }

  /**
   * Подписка на канал веб-сокета.
   * Важно делать отписку от канала используя RxJS оператор (takeUntil), чтобы сокет не продлжал слушать канал,
   * кроме этого, можно передать сигналы отписки (не обязательный параметр).
   * Сервис автоматически отписывается от каналов, если произошло разлогинивание пользователя.
   *
   * @param channel имя канала, строка для подписки
   * @param stopListenEvents (не обязательный параметр) сигналы для отписки от канала
   */
  public listenChannel<D = unknown>(
    channel: string,
    stopListenEvents?: Observable<unknown>[],
    additionalHeaders?: { [key: string]: string },
  ): Observable<D> {
    const token = localStorage.getItem(LocalStorageConstants.Token);

    if (!this.auth.loggedIn || !token) {
      return throwError(() => new Error('Can not start listen channel, user does not logged'));
    }

    const stopListens$: Observable<unknown>[] = [this.stopListen$];

    if (stopListenEvents && stopListenEvents.length) {
      stopListens$.push(...stopListenEvents);
    }

    let headers: StompHeaders = {
      [LocalStorageConstants.TokenHeader]: token,
    };

    if (isNotEmptyObject(additionalHeaders)) {
      headers = {
        ...headers,
        ...additionalHeaders,
      };
    }

    return this.stompService
      .watch({
        destination: channel,
        subHeaders: headers,
        unsubHeaders: headers,
      })
      .pipe(
        map((data) => JSON.parse(data.body) as D),
        takeUntil(merge(...stopListens$)),
      );
  }

  private listenErrorsChannel(): void {
    this.listenChannel('/topic/errors').subscribe((err) => {
      throw new Error(`WS_ERROR: ${err}`);
    });
  }
}
