import { concat, defer, EMPTY, MonoTypeOperatorFunction, Observable, pipe, Subscription } from 'rxjs';
import {
  debounceTime,
  dematerialize,
  distinctUntilChanged,
  filter,
  finalize,
  ignoreElements,
  map,
  materialize,
  publish,
  refCount,
  share,
  switchMap,
  take,
  takeUntil,
  tap,
} from 'rxjs/operators';
import { filterEmpty } from 'core/rxjs';
import { Store } from 'core/store';
import { isScheduledData, tryUnwrapScheduledData } from './helpers/type.helpers';
import {
  LectaScheduler,
  LectaSchedulerConfig,
  QueueResult,
  Scheduled,
  ScheduledData,
  ScheduledInferredData,
  SchedulingInput,
  SchedulingSettings,
  UNLIMITED_SCHEDULER_LIMIT,
} from './interfaces';

const BUFFER_TIME_MS = 10;
const RESET_QUEUE_TIMEOUT_MS = 1000;

interface BufferScheduleItem<V = unknown> {
  scheduleFactory: Scheduled<V>;
  parameters?: SchedulingSettings;
}

const INITIAL_STORE_STATE = {
  remainingLimit: UNLIMITED_SCHEDULER_LIMIT,
  items: [] as BufferScheduleItem[],
  terminated: false,
};

export class SchedulerQueueInstance implements LectaScheduler {
  private queue$: Observable<unknown> | undefined;
  private resetQueueSubscription: Subscription;
  private store = new Store<typeof INITIAL_STORE_STATE, { terminate: boolean }>(INITIAL_STORE_STATE);
  private terminated$: Observable<boolean> = this.store.select('terminated').pipe(filter(terminated => terminated));
  private schedulingBuffer = this.store.select('items').pipe(
    distinctUntilChanged(),
    debounceTime(BUFFER_TIME_MS),
    map(items => {
      // eslint-disable-next-line functional/immutable-data
      items.sort(
        (left, right) =>
          (left.parameters?.order ?? Number.MAX_SAFE_INTEGER) - (right.parameters?.order ?? Number.MAX_SAFE_INTEGER),
      );
      return items;
    }),
    filter(items => !!items.length),
    share(),
  );

  constructor(private rootQueue: SchedulerQueueInstance, schedulerConfig?: LectaSchedulerConfig) {
    if (schedulerConfig?.limit) {
      this.store.updateFields({ remainingLimit: schedulerConfig.limit });
    }
    this.resetQueueSubscription = this.store
      .select('items')
      .pipe(
        map(items => !items.length),
        distinctUntilChanged(),
        debounceTime(RESET_QUEUE_TIMEOUT_MS),
      )
      .subscribe(() => {
        if (this.store.get('items').length === 0) {
          this.resetQueue();
        }
      });
  }

  schedule<V, S = ScheduledInferredData<V>>(
    scheduleFactory: Scheduled<S>,
    settings: SchedulingSettings = {},
  ): Observable<S> {
    const scheduled$ = this.rootQueue
      ? this.rootQueue.schedule(() => this.scheduleInternal(scheduleFactory, settings), settings)
      : this.scheduleInternal(scheduleFactory, settings);
    return scheduled$.pipe(takeUntil(this.terminated$));
  }

  terminate(): void {
    this.store.updateField('terminated', true);
    setTimeout(() => this.store.updateField('terminated', false), 0);
  }

  skip(): Observable<never> {
    this.increaseLimit();
    return EMPTY;
  }

  destroy(): void {
    this.resetQueue();
    this.resetQueueSubscription.unsubscribe();
  }

  private scheduleInternal<V, S = ScheduledInferredData<V>>(
    scheduleFactory: Scheduled<S>,
    parameters: SchedulingSettings = {},
  ): Observable<S> {
    const bufferItem = { scheduleFactory, parameters };
    this.pushToBuffer(bufferItem);
    return this.addToQueueFromBuffer(bufferItem).pipe(map(data => tryUnwrapScheduledData(data)));
  }

  private addToQueueFromBuffer<V>(bufferItem: BufferScheduleItem<V>): QueueResult<V> {
    return this.schedulingBuffer.pipe(
      filter(([first]) => first === bufferItem),
      map(() => this.getFromBuffer<V>()),
      filterEmpty(),
      take(1),
      switchMap(item => this.addToQueue<V>(item)),
      finalize(() => {
        this.removeFromBuffer(bufferItem);
      }),
      takeUntil(this.terminated$),
    );
  }

  private addToQueue<V>(bufferItem: BufferScheduleItem<V>): QueueResult<V> {
    const { parameters } = bufferItem;
    if (!parameters?.ignoreLimit) {
      const { remainingLimit } = this.store.get();
      if (remainingLimit > UNLIMITED_SCHEDULER_LIMIT && remainingLimit === 0) {
        return EMPTY;
      }
      this.decreaseLimit();
    }
    const stream$ = this.createStream(bufferItem).pipe(takeUntil(this.terminated$));
    return this.updateQueue(stream$);
  }

  private createStream<V>(bufferItem: BufferScheduleItem<V>): QueueResult<V> {
    return defer(() => {
      const [source, schedulingSettings] = SchedulerQueueInstance.unwrapStream(bufferItem.scheduleFactory);
      return source.pipe(
        this.checkTermination(),
        finalize(() => {
          schedulingSettings?.teardown?.();
        }),
      );
    });
  }

  private checkTermination<V>(): MonoTypeOperatorFunction<V | ScheduledData<V>> {
    let settings: SchedulingSettings | undefined;
    return pipe(
      tap(data => {
        if (isScheduledData(data)) {
          settings = data.schedulingSettings;
        }
      }),
      materialize(),
      map(data => {
        if (data.kind === 'C' && settings?.terminateOnComplete) {
          this.terminate();
        }
        return data;
      }),
      dematerialize(),
    );
  }

  private decreaseLimit(): void {
    const { remainingLimit } = this.store.get();
    this.store.updateFields({ remainingLimit: remainingLimit - 1 });
  }

  private increaseLimit(): void {
    const { remainingLimit } = this.store.get();
    if (remainingLimit <= UNLIMITED_SCHEDULER_LIMIT) {
      return;
    }
    this.store.updateFields({ remainingLimit: remainingLimit + 1 });
  }

  private pushToBuffer<V>(param: BufferScheduleItem<V>): void {
    const items = this.store.get('items');
    this.store.updateFields({ items: [...items, param] });
  }

  private getFromBuffer<V>(): BufferScheduleItem<V> | undefined {
    const items = this.store.get('items');
    return items[0] as BufferScheduleItem<V>;
  }

  private removeFromBuffer<V>(bufferItem: BufferScheduleItem<V>): void {
    const items = this.store.get('items');
    if (items.includes(bufferItem)) {
      const updatedItems = items.filter(item => item !== bufferItem);
      this.store.updateFields({ items: updatedItems });
    }
  }

  private updateQueue<V>(stream$: Observable<V | ScheduledData<V>>): Observable<V | ScheduledData<V>> {
    const schedulingStream = stream$.pipe(publish(), refCount());
    const result$ = this.queue$ ? concat(this.queue$.pipe(ignoreElements()), schedulingStream) : schedulingStream;
    this.queue$ = result$;
    return result$;
  }

  private resetQueue(): void {
    this.queue$ = undefined;
    this.store.updateFields({ items: [] });
  }

  private static unwrapStream<V>(scheduled: Scheduled<V>): [QueueResult<V>, SchedulingInput<V>['schedulingSettings']] {
    const result = scheduled();
    return result instanceof Observable ? [result, undefined] : [result.source(), result.schedulingSettings];
  }
}
