import { Pipe, PipeTransform } from '@angular/core';
import { Observable, of } from 'rxjs';
import { catchError, map, pairwise, startWith, takeUntil } from 'rxjs/operators';
import { BaseObject } from '@shared/base/base-object';
import { isNotNull } from '@shared/base/core';
import { RecalculatedInstrumentDTO } from '@shared/dto/positions/recalculated-instrument-dto';
import { PriceStreamsService } from '@shared/helpers/price-streams.service';

type PriceChange = 'success' | 'danger';

interface PriceBySocketData {
  price: number;
  percentChange: number;
  changeType: PriceChange;
  momentChangeType: PriceChange;
  close: string;
}
@Pipe({
  name: 'getInstrumentPriceBySocket',
})
export class GetInstrumentPriceBySocketPipe extends BaseObject implements PipeTransform {
  constructor(private priceStreamsService: PriceStreamsService) {
    super();
  }

  public transform(
    price: RecalculatedInstrumentDTO,
    instrumentId: string,
    percent: number = null,
  ): Observable<PriceBySocketData> {
    const defaultInstrument: RecalculatedInstrumentDTO = price || { price: 0 };

    return this.getRecalculatedInstrument(instrumentId).pipe(
      startWith(defaultInstrument),
      startWith(defaultInstrument),
      pairwise(),
      map(([previousData, currentData]) => {
        let changeType: PriceChange = null;
        let momentChangeType: PriceChange = null;
        let previousValue: RecalculatedInstrumentDTO;

        if (isNotNull(currentData?.price)) {
          if (isNotNull(previousData?.price)) {
            previousValue = previousData;
          } else {
            previousValue = defaultInstrument;
          }

          const lastPrice = isNotNull(percent) ? price.price / (percent / 100 + 1) : price.price;
          const percentChange = Math.abs(((currentData.price - lastPrice) / lastPrice) * 100);

          if (currentData.price > lastPrice) {
            changeType = 'success';
          }

          if (currentData.price < lastPrice) {
            changeType = 'danger';
          }

          if (currentData.price > previousValue.price) {
            momentChangeType = 'success';
          }

          if (currentData.price < previousValue.price) {
            momentChangeType = 'danger';
          }

          return {
            price: currentData.price,
            percentChange: percentChange,
            changeType: changeType,
            momentChangeType: momentChangeType,
            close: currentData.priceRecountTime,
          };
        } else if (isNotNull(previousData?.price)) {
          return {
            price: previousData.price,
            percentChange: 0,
            changeType: changeType,
            momentChangeType: momentChangeType,
            close: previousData.priceRecountTime,
          };
        }

        return {
          price: price?.price || 0,
          percentChange: 0,
          changeType: changeType,
          momentChangeType: momentChangeType,
          close: price?.priceRecountTime,
        };
      }),
      catchError(() =>
        of({
          price: price?.price || 0,
          percentChange: 0,
          changeType: null,
          momentChangeType: null,
          close: null,
        }),
      ),
      takeUntil(this.destroy$),
    );
  }

  private getRecalculatedInstrument(instrumentId: string): Observable<RecalculatedInstrumentDTO> {
    return this.priceStreamsService.getInstrumentPrice(instrumentId);
  }
}
