import { asyncScheduler, Observable, OperatorFunction, SchedulerLike, Subscription } from 'rxjs';

/**
 * Emits all events from the first one to the end of delay
 */
export function bufferDelay<T>(
  delayTimeMs: number,
  scheduler: SchedulerLike = asyncScheduler,
): OperatorFunction<T, T[]> {
  return (source: Observable<T>) => {
    return new Observable<T[]>(subscriber => {
      let bufferedValues: T[] = [];
      let schedulerSubscription: Subscription | undefined;

      const sourceSubscription = source.subscribe(
        value => {
          // eslint-disable-next-line functional/immutable-data
          bufferedValues.push(value);

          if (!schedulerSubscription) {
            setupScheduler();
          }
        },
        error => subscriber.error(error),
        () => onSourceComplete(),
      );

      return () => {
        sourceSubscription.unsubscribe();
      };

      function onSourceComplete(): void {
        if (schedulerSubscription) {
          schedulerSubscription.unsubscribe();
        }

        if (bufferedValues.length) {
          subscriber.next(bufferedValues);
        }

        subscriber.complete();
      }

      function setupScheduler(): void {
        schedulerSubscription = scheduler.schedule(() => {
          const readyToSendValues = bufferedValues;
          bufferedValues = [];
          schedulerSubscription = undefined;

          subscriber.next(readyToSendValues);
        }, delayTimeMs);
      }
    });
  };
}
