import { isObservable, NextObserver, Observable, OperatorFunction, pipe } from 'rxjs';
import { filter, take } from 'rxjs/operators';
import nonNil from './fp/non-nil';

interface Context<T = any, R = any> extends NextObserver<T | R> {
  source: T | Observable<T>;
  operators?: OperatorFunction<T, R>;
}

export const applyASAP = <T, R>(context: Context<T, R>): any => {
  if (isObservable(context.source)) {
    return (context.source as Observable<T>)
      .pipe(filter(nonNil), context.operators || pipe(), take(1))
      .subscribe(event => context.next(event));
  }
  context.next(context.source);
};
