import { Injectable } from "@angular/core";
import {
  BehaviorSubject,
  Subscription,
  merge,
  timer,
  interval,
  pipe,
} from "rxjs";
import { map, filter, bufferCount, switchMap } from "rxjs/operators";
import { ApiService } from "@app/api/api";
import { RealTimeService } from "./realtime.service";
import { AircraftStatus } from "@app/api/types";

interface ACSResult {
  latest: AircraftStatus[];
}

@Injectable()
export class ACSService {
  private $latest: Subscription;
  private _status: BehaviorSubject<AircraftStatus[]> = new BehaviorSubject<
    AircraftStatus[]
  >([]);

  get status() {
    return merge(
      this._status.asObservable(),
      timer(5, 30 * 1000).pipe(map((e) => this._status.getValue()))
    );
  }

  aircraft_status(aircraft: number) {
    return this.status.pipe(
      map((statuses: AircraftStatus[]) => {
        return statuses.find(
          (status: AircraftStatus) => status.properties.aircraft == aircraft
        );
      }),
      filter((e) => e != null)
    );
  }

  constructor(private api: ApiService, private rts: RealTimeService) {}

  update_status(updates: AircraftStatus[]) {
    if (updates.length == 0) return;
    let state = this._status.getValue();
    updates.forEach(
      (update: AircraftStatus, _i: number, _a: AircraftStatus[]) => {
        let ndx = state.findIndex(
          (value, _index, _array) =>
            update.properties.aircraft == value.properties.aircraft
        );
        if (ndx >= 0) {
          if (state[ndx].properties.timestamp <= update.properties.timestamp) {
            state[ndx] = update;
          }
        } else {
          state.push(update);
        }
      }
    );
    this._status.next([...state]);
  }

  start() {
    if (this.$latest && !this.$latest.closed) {
      console.log("ACS service already running");
      return;
    }
    console.log("Starting ACS Service");
    this.api.aircraft_status.latest().subscribe({
      next: (res: ACSResult) => {
        if (res.latest) {
          this.update_status(res.latest);
        }
        this._status.next(res.latest);
        this.$latest = this.rts.connected
          .pipe(
            switchMap((connected: boolean) => {
              if (connected) {
                console.log("Using Realtime Service for location updates");
                return this.rts
                  .messages("acs update")
                  .pipe(bufferCount(1))
                  .pipe(map((u) => u.map((e) => e.update)));
              } else {
                console.log("Using API for location updates");
                return interval(5000).pipe(
                  switchMap((e) =>
                    this.api.aircraft_status
                      .latest()
                      .pipe(map((e: any) => e.latest))
                  )
                );
              }
            })
          )
          .pipe(filter((e) => e != null))
          .subscribe({
            next: (update: AircraftStatus[]) => {
              this.update_status(update);
            },
            error: (error) => {
              console.debug("error retrieving aircraftstatus", error);
            },
          });
      },
      error: (error) => {
        console.debug("error retrieving aircraftstatus", error);
      },
    });
  }

  stop() {
    if (this.$latest) this.$latest.unsubscribe();
  }
}
