import {
  BehaviorSubject, from, EMPTY, firstValueFrom, of, Subject, merge, ReplaySubject,
} from 'rxjs';
import {
  map, combineLatestWith, concatMap, filter, switchMap, delay,
  distinctUntilChanged, share, scan, tap,
} from 'rxjs/operators';
import patchFn from './StructDiff';
import { dclone } from './Utils';

export default class BSStream extends EventTarget {
  constructor(name, manager) {
    super();
    this.name = name;
    this.manager = manager;
    this.lastId = null;

    this.handlersToFns = new Map();

    this.desiredJoinState$ = new BehaviorSubject(false); // user's stated wishes (non-null)
    this.joinState$ = new BehaviorSubject(false); // joined || leaving
    this.futureState$ = new BehaviorSubject(false); // joining || joined

    // derived
    this.stabilizedJoinState1$ = new BehaviorSubject(false);
    this.stabilizedJoinState2$ = new BehaviorSubject(false);

    this.EPSs$ = new Subject(); // stream of event patches and states
    this.initialStates$ = this.EPSs$.pipe(
      filter((x) => x.type === 'state'),
      map((x) => x.data),
      share(),
    );
    this.eventsPatches$ = this.EPSs$.pipe(
      filter((x) => x.type === 'eventPatch'),
      map((x) => ({
        id: x.id,
        ...(('event' in x) && { event: x.event }),
        ...(('patch' in x) && { patch: x.patch }),
      })),
      share(),
    );
    this.eventsWithIds$ = this.eventsPatches$.pipe(
      filter((x) => x.event != null),
      map((x) => ({ id: x.id, event: x.event })),
    );
    this.events$ = this.eventsWithIds$.pipe(
      map((x) => x.event),
    );
    this.state = undefined;
    this.state$ = new ReplaySubject(1);

    // set stream's state on every state & eventPatch message
    // useful because the type of state might change, and because
    // of initialStates$
    this.state$.subscribe((s) => { this.state = s; });
    this.state$.subscribe((state) => this.emit('state', state));
    this.eventsPatches$.subscribe((eventPatch) => this.emit('eventPatch', eventPatch));
    this.eventsWithIds$.subscribe((eventWithId) => this.emit('eventWithId', eventWithId));
    this.events$.subscribe((event) => this.emit('event', event));
    this.initialStates$.subscribe((iState) => this.emit('initialState', iState));

    // send join or leave request (returns state and past events)
    const sendJoinOrLeave = (desiredJoinState) => this.manager.doRequest(
      '!open',
      desiredJoinState ? 'join' : 'leave',
      desiredJoinState ? {
        name: this.name,
        last_id: this.lastId,
      } : this.name,
    );

    let epRepository = [];

    // these subscriptions will be unsubscribed from on channel destruction
    this.subscriptionsToDestroy = [

      // keep state and state$ up to date
      merge(
        this.initialStates$.pipe(
          map((x) => [x, null]),
        ),

        this.eventsPatches$.pipe(
          filter((x) => 'patch' in x),
          map((x) => [null, x.patch]),
        ),
      ).pipe(
        scan((acc, [initialState, diff]) => {
          if (initialState !== null) return initialState;
          return dclone(patchFn(acc, diff));
        }, undefined),
        tap((x) => { this.state = x; }),
      ).subscribe(this.state$),

      // on disconnect, set not-joined (left)
      this.manager.connected$.pipe(
        filter((x) => !x),
      ).subscribe(() => {
        this.futureState$.next(false);
        this.joinState$.next(false);
      }),

      // fill epRepository if we must with incoming event-patches
      this.manager.incoming$.pipe(
        filter((msg) => msg.stream === name && msg.type === 'eventPatch'),
        map((msg) => dclone(msg)),
      ).subscribe((ep) => {
        // eslint-disable-next-line no-param-reassign
        delete ep.stream;
        if (epRepository) epRepository.push(ep);
        else if (this.joinState$) {
          if (ep.id > this.lastId) {
            this.EPSs$.next(ep);
            this.lastId = ep.id;
          }
        }
      }),

      // set or remove epRepository
      this.joinState$.pipe(
        combineLatestWith(this.futureState$),
        filter(([present]) => !present),
      ).subscribe(([, future]) => {
        if (future) epRepository ??= [];
        else epRepository = null;
      }),

      // set stabilized join states
      this.joinState$.pipe(
        combineLatestWith(this.futureState$),
        map(([present, future]) => (present === future ? present : null)),
        switchMap((x) => (x === null ? of(null) : of(x).pipe(delay(100)))),
        distinctUntilChanged(),
      ).subscribe(this.stabilizedJoinState1$),

      this.joinState$.pipe(
        combineLatestWith(this.futureState$, this.manager.connected$, this.desiredJoinState$),
        switchMap(([present, future, connected, desired]) => {
          if (!connected) return desired ? of(null) : of(false).pipe(delay(100));
          return present === future ? of(present).pipe(delay(100)) : of(null);
        }),
        distinctUntilChanged(),
      ).subscribe(this.stabilizedJoinState2$),

      // send join or leave requests at the right times, and process results
      this.desiredJoinState$.pipe(
        combineLatestWith(this.manager.connected$),
        concatMap(([desired, connected]) => {
          if (!connected) return EMPTY;
          if (desired !== this.desiredJoinState$.getValue()) return EMPTY;
          if (desired === this.futureState$.getValue()) return EMPTY;
          this.futureState$.next(desired);
          return from(
            sendJoinOrLeave(desired)
              .then(
                (stateAndPastEvents) => ({ ok: true, desired, stateAndPastEvents }),
                () => ({ ok: false, desired }),
              ),
          );
        }),
      ).subscribe(({ ok, desired, stateAndPastEvents }) => {
        if (ok) {
          if (desired) {
            // push to EPSs$ and set joinState
            const { events, state: { data: state, id: stateId } } = stateAndPastEvents;
            if (stateId < (this.lastId ?? 0)) throw new Error('state.id < this.lastId');
            events.forEach(({ event, id: eventId }) => {
              if (eventId <= (this.lastId ?? 0)) return;
              this.EPSs$.next({
                type: 'eventPatch',
                id: eventId,
                event,
              });
            });
            this.EPSs$.next({
              type: 'state',
              id: stateId,
              data: state,
            });
            this.lastId = stateId;
            epRepository.forEach((ep) => {
              if (ep.id > this.lastId) {
                this.EPSs$.next(ep);
                this.lastId = ep.id;
              }
            });
            epRepository = null;
          }
          this.joinState$.next(desired);
        } else this.futureState$.next(this.joinState$.getValue());
      }),
    ];
  }

  // Note: test that this works
  async destroy() {
    this.desiredJoinState$.next(false);
    await firstValueFrom(this.stabilizedJoinState1$.pipe(filter((x) => x === false)));
    this.subscriptionsToDestroy.forEach((s) => s.unsubscribe());
    [
      'desiredJoinState', 'joinState', 'futureState',
      'stabilizedJoinState1', 'stabilizedJoinState2',
      'EPSs', 'state',
    ].forEach((subjectName) => this[`${subjectName}$`].complete());
  }

  async joinOrLeave(value) {
    this.desiredJoinState$.next(value);
    const eventualState = await firstValueFrom(
      (value ? this.stabilizedJoinState2$ : this.stabilizedJoinState1$).pipe(
        filter((x) => x !== null),
      ),
    );
    if (eventualState === value) return;
    const verb = value ? 'join' : 'leave';
    if (this.desiredJoinState$.getValue() !== value) throw new Error(`${verb} cancelled`);
    throw new Error(`Couldn't ${verb}`);
  }

  async join() {
    await this.joinOrLeave(true);
  }

  async leave() {
    await this.joinOrLeave(false);
  }

  async doAction(actionName, payload = null, { waitForJoin } = {}) {
    const stabilizedState = await firstValueFrom(
      (waitForJoin ? this.stabilizedJoinState2$ : this.stabilizedJoinState1$)
        .pipe(filter((x) => x !== null)),
    );
    if (!stabilizedState) throw new Error("not joined stream, can't do action");

    this.manager.doAction(this.name, actionName, payload);
  }

  async doRequest(requestName, payload = null, { waitForJoin } = {}) {
    const stabilizedState = await firstValueFrom(
      (waitForJoin ? this.stabilizedJoinState2$ : this.stabilizedJoinState1$)
        .pipe(filter((x) => x !== null)),
    );
    if (!stabilizedState) throw new Error("not joined stream, can't do request");

    return this.manager.doRequest(this.name, requestName, payload);
  }

  on(type, handler) {
    const fn = (event) => handler(event.detail);
    this.handlersToFns.set(handler, fn);
    this.addEventListener(type, fn);
    return fn;
  }

  off(type, handlerOrFn) {
    this.removeEventListener(type, handlerOrFn);
    const maybeFn = this.handlersToFns.get(handlerOrFn);
    if (maybeFn) this.removeEventListener(type, maybeFn);
  }

  emit(type, detail) {
    this.dispatchEvent(new CustomEvent(type, { detail }));
  }
}
