import { Injectable } from "@angular/core";
import { webSocket, WebSocketSubject } from "rxjs/webSocket";
import { ConfigService } from "./config.service";
import {
  Subject,
  Subscription,
  timer,
  Observable,
  BehaviorSubject,
} from "rxjs";
import {
  filter,
  retryWhen,
  tap,
  delayWhen,
  map,
  finalize,
} from "rxjs/operators";
import { TokenService } from "./token.service";
import { EndpointRecordUpdate } from "@cleargrid/map_layers";

export interface Payload {
  id: number;
  payload: number;
  payload_ident: string;
  payload_name: string;
  payload_channel: string;
}

@Injectable()
export class RealTimeService {
  hostname: string = null;
  subscriptions: Set<string> = new Set();
  private _ws: WebSocketSubject<any> = null;
  private _ws_retries = 0;
  private ping$: Subscription = null;
  private auth$: Subscription = null;
  private _data: Subject<any> = new Subject<any>();
  private _connected: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(
    false
  );
  ping_interval = 30 * 1000;

  constructor(private config: ConfigService, private token: TokenService) {
    this.config.socket_server$.subscribe((hostname) => {
      this.hostname = hostname;
      this.connect();
    });
  }
  get connected() {
    return this._connected.asObservable();
  }

  subscribe(channel_name: string) {
    console.log(`Subscribing to ${channel_name}`);
    if (!this.subscriptions.has(channel_name)) {
      this.subscriptions.add(channel_name);
    }
    if (this._connected.getValue()) {
      this.send({ tag: "subscribe", channel: channel_name });
    }
  }

  unsubscribe(channel_name: string) {
    console.log(`Unsubscribing from ${channel_name}`);
    if (this.subscriptions.has(channel_name)) {
      this.subscriptions.delete(channel_name);
      if (this._connected.getValue()) {
        this.send({ tag: "unsubscribe", channel: channel_name });
      }
    }
  }

  retry_delay() {
    let delay = Math.min(2 ** this._ws_retries, 300);
    console.debug(`Delaying ${delay} seconds`);
    return delay * 1000;
  }

  connect() {
    if (!this.hostname) {
      console.error("invalid hostname " + JSON.stringify(this.hostname));
      return;
    }
    console.debug(`connecting websocket to ${this.hostname}`);
    const srv = this;
    if (this._ws) {
      console.log("Disconnecting existing websocket");
      this._ws.complete();
    }
    try {
      this._ws = webSocket<any>({
        url: this.hostname + "/ws/realtime/",
        openObserver: {
          next() {
            srv._connected.next(true);
            srv._ws_retries = 0;

            srv.subscriptions.forEach((channel) => srv.subscribe(channel));

            const interval = srv.ping_interval;
            srv.ping$ = timer(interval, interval).subscribe((seq) => {
              srv.ping(seq);
            });
            srv.auth$ = srv.token.follow().subscribe((token) => {
              srv.send({ tag: "auth", token: token });
            });
          },
        },
        closeObserver: {
          next(closeEvent) {
            console.debug(
              "closeObserver: Closing - " + JSON.stringify(closeEvent)
            );
            srv._connected.next(false);
            if (srv.ping$) srv.ping$.unsubscribe();
            if (srv.auth$) srv.auth$.unsubscribe();
          },
        },
      });
    } catch (err) {
      this._ws_retries++;
      console.error(
        `Error creating websocket (retry ${this._ws_retries}): ${err}`
      );
      setInterval(() => {
        this.connect();
      }, this.retry_delay());
      return;
    }

    this._ws
      .asObservable()
      .pipe(
        retryWhen((errors) =>
          errors.pipe(
            tap((val) => {
              console.error(`Connection closed ${val}`);
              this._ws_retries++;
            }),
            delayWhen(() => {
              return timer(this.retry_delay());
            })
          )
        )
      )
      .subscribe({
        next: (msg) => {
          this.handle_message(msg);
        },
        error: (error) => {
          this.handle_error(error);
        },
        complete: () => {
          console.debug("websocket observable complete");
        },
      });
  }

  handle_message(message) {
    // console.debug("received message: " + JSON.stringify(message));
    this._data.next(message);
  }

  handle_error(error) {
    console.error("error occurred: " + JSON.stringify(error));
  }

  send(message: any) {
    if (this._ws) {
      // console.debug("sending message: " + JSON.stringify(message));
      this._ws.next(message);
    }
  }

  ping(sequence: number) {
    const msg = { tag: "ping", sequence: sequence };
    this.send(msg);
  }

  messages(tag: string = undefined): Observable<any> {
    return this._data
      .asObservable()
      .pipe(filter((e) => (tag && e?.tag ? e.tag == tag : e)));
  }

  payload_subscribe(payload: Payload): Observable<PayloadUpdateMessage> {
    this.subscribe(payload.payload_channel);
    return this.messages("payload_updates").pipe(
      finalize(() => {
        this.unsubscribe(payload.payload_channel);
      }),
      map((e) => e as PayloadUpdateMessage),
      filter((e) => e.payload == payload.payload_ident)
    );
  }
}

export interface PayloadUpdateMessage {
  tag: string;
  payload: string;
  updates?: EndpointRecordUpdate[];
  refresh?: boolean;
}
