import {
  Subject, of, BehaviorSubject, fromEvent, firstValueFrom, throwError, timer, EMPTY,
  merge,
} from 'rxjs';
import {
  switchMap, distinctUntilChanged, share, map, withLatestFrom, filter, takeUntil,
  delay, startWith, tap, mapTo,
} from 'rxjs/operators';
import makeWebSocketsObservable from './BSWebSockets';
import BSStream from './BSStream';
import { uniqueID, concatArr8 } from './Utils';

export default class BSManager extends EventTarget {
  constructor(url) {
    super();

    // calculate URL of websocket connection
    if (!url.match(/^wss?:\/\//)) {
      const { protocol, host } = window.location;
      const wsProtocol = protocol.replace(/^http/, 'ws');
      this.url = `${wsProtocol}//${host}${url}`;
    } else {
      this.url = url;
    }

    // for on, off, emit
    this.handlersToFns = new Map();

    // to send structured data to the webserver, next send$
    this.send$ = new Subject();

    // long incoming messages are divided into chunks
    let ongoingMessages = {};

    // by setting pleaseBeConnected$, one initiates
    // WS connection attempts
    this.pleaseBeConnected$ = new Subject();
    const websockets$ = this.pleaseBeConnected$.pipe(
      switchMap((beConnected) => {
        if (beConnected) {
          return makeWebSocketsObservable(this.url);
        }
        return of(null);
      }),
      distinctUntilChanged(),
      tap(() => { ongoingMessages = {}; }),
      share(),
    );

    // connect send$ to the latest websocket
    this.send$.pipe(
      withLatestFrom(websockets$),
    ).subscribe(([data, ws]) => {
      if (!ws) throw new Error('Attempted to send data while not connected');
      ws.send(JSON.stringify(data));
    });

    // read incoming messages
    this.incoming$ = new Subject();
    websockets$.pipe(
      filter((x) => x !== null),
      switchMap((ws) => fromEvent(ws, 'message')),
      switchMap((binaryPart) => {
        const u8arr = new Uint8Array(binaryPart.data);
        const ordColon = ':'.codePointAt(0);
        if (u8arr[0] === ordColon) {
          // message really is a binary part
          const nextColonAt = u8arr.indexOf(ordColon, 1);
          const bytesPrefixU8 = u8arr.slice(1, nextColonAt);
          const bytesPrefix = new TextDecoder().decode(bytesPrefixU8);
          const remaining = u8arr.subarray(nextColonAt + 2);

          const spaceAt = bytesPrefix.lastIndexOf(' ');
          const identifier = bytesPrefix.slice(0, spaceAt);
          const [, i, isFinal] = bytesPrefix.slice(spaceAt + 1).match(/(\d+)(\$)?/);
          const iNum = parseInt(i, 10);

          if (iNum === 0) {
            ongoingMessages[identifier] = {
              i: 0,
              stringU8: remaining,
            };
          } else {
            if (iNum !== (ongoingMessages[identifier]?.i ?? -2) + 1) {
              delete ongoingMessages[identifier];
            }
            if (ongoingMessages[identifier]) {
              ongoingMessages[identifier] = {
                i: iNum,
                stringU8: concatArr8(ongoingMessages[identifier].stringU8, remaining),
              };
            }
          }

          if (isFinal && iNum === (ongoingMessages[identifier]?.i ?? -1)) {
            const str = new TextDecoder().decode(ongoingMessages[identifier].stringU8);
            delete ongoingMessages[identifier];
            return of(str);
          }

          return EMPTY;
        }

        // if message is not a part
        const str = new TextDecoder().decode(u8arr);
        return of(str);
      }),
      map((str) => JSON.parse(str)),
    ).subscribe(this.incoming$);

    // incoming JSON-RPC responses
    this.responses$ = this.incoming$.pipe(
      filter((data) => data.jsonrpc === '2.0' && ('result' in data || 'error' in data)),
      share(),
    );

    const config$ = this.incoming$.pipe(
      filter((msg) => msg.type === 'config'),
      map((msg) => msg.data),
      share(),
    );

    // create connected$ to show WebSocket connection status (after config is received)
    this.connected$ = new BehaviorSubject(false);
    merge(
      config$.pipe(
        mapTo(true),
      ),
      websockets$.pipe(
        filter((x) => !x),
        mapTo(false),
      ),
    ).pipe(
      distinctUntilChanged(),
    ).subscribe(this.connected$);
    this.connected$.subscribe((status) => this.emit('connected', status));

    // ping
    config$.pipe(
      map((config) => (config.pingInterval - Math.random()) * 1e3),
      switchMap(
        (iv) => timer(iv * Math.random(), iv).pipe(
          takeUntil(this.connected$.pipe(filter((x) => !x))),
        ),
      ),
    ).subscribe(() => this.send({ type: 'ping' }));

    // timeout
    config$.pipe(
      switchMap((config) => this.incoming$.pipe(
        startWith(null),
        switchMap(() => of(true).pipe(
          delay((config.pingInterval + 10) * 1e3),
          takeUntil(this.connected$.pipe(filter((x) => !x))),
        )),
      )),
      withLatestFrom(websockets$),
    ).subscribe(([, ws]) => ws.close());
  }

  connect() {
    this.pleaseBeConnected$.next(true);
  }

  disconnect() {
    this.pleaseBeConnected$.next(false);
  }

  send(data) {
    if (!this.connected$.getValue() && data.type !== 'ping') {
      throw new Error("Can't send data because WebSocket not available");
    }
    this.send$.next(data);
  }

  doAction(streamName, actionName, payload = null) {
    this.send({
      jsonrpc: '2.0',
      method: 'doAction',
      params: [streamName, actionName, payload],
    });
  }

  doRequest(streamName, requestName, payload = null) {
    const id = uniqueID();

    this.send({
      jsonrpc: '2.0',
      method: 'doRequest',
      params: [streamName, requestName, payload],
      id,
    });

    return firstValueFrom(
      this.responses$.pipe(
        filter((data) => data.jsonrpc === '2.0' && data.id === id),
        takeUntil(this.connected$.pipe(filter((c) => !c))),
        switchMap((data) => {
          if ('result' in data) return of(data.result);
          return throwError(data.error);
        }),
      ),
    );
  }

  newStream(streamName) {
    return new BSStream(streamName, this);
  }

  on(type, handler) {
    const fn = (event) => handler(event.detail);
    this.handlersToFns.set(handler, fn);
    this.addEventListener(type, fn);
    return fn;
  }

  off(type, handlerOrFn) {
    this.removeEventListener(type, handlerOrFn);
    const maybeFn = this.handlersToFns.get(handlerOrFn);
    if (maybeFn) this.removeEventListener(type, maybeFn);
  }

  emit(type, detail) {
    this.dispatchEvent(new CustomEvent(type, { detail }));
  }
}
