import { Injectable, OnDestroy } from '@angular/core';
import { Subject, pairs, timer, Observable } from 'rxjs';
import { mergeMap, filter, map, takeUntil } from 'rxjs/operators';

import { ApiService } from '@app/api/api';
import { NotificationService } from '@app/services/notification.service';
import { TaskCountService } from '@app/services/task.count.service';


class StatusOptions {
  message?: string;
  update?: boolean;
  final?: boolean;
  callback?: (task_result: any) => StatusOptions | undefined;

  constructor(
    message: string = undefined,
    update: boolean = false,
    final: boolean = false,
    callback: (any) => StatusOptions | undefined
  ) {
    this.update = update;
    this.final = final;
    if (callback !== undefined) {
      this.callback = callback
    }
    if (message !== undefined) {
      this.message = message
    }

  }
}

class TaskStatus {
  __last_state: string;
  task_complete: boolean;
  message?: string;
  queued?: StatusOptions;
  finished?: StatusOptions;
  failed?: StatusOptions;
  started?: StatusOptions;
  deferred?: StatusOptions;
  result: any;


  constructor() {
    this.__last_state = '';
    this.task_complete = false;
    this.result = null
  }

  update(result) {
    // if the status has changed and the status has options defined
    const options = (result.status !== this.__last_state && result.status in this);
    this.__last_state = result.status;
    return options
  }

  complete(result) {
    this.task_complete = true;
    this.result = result
  }
}

@Injectable()
export class TaskService implements OnDestroy {

  // consider creating object and using Observable.pairs to iterate over keys and values
  // This would allow callbacks on task completion
  tasks: { [id: string]: TaskStatus } = {};
  private update = new Subject<any>();
  current_tasks: number = 0;
  unSubscribe = new Subject();
  empty_obs = new Observable();
  task_retry = true;
  get pending_tasks() {
    return pairs(this.tasks)
      .pipe(
        filter(pair => {
          let value = <TaskStatus>pair[1];
          return (!value.task_complete)
        }),
        map(pair => <string>pair[0])
      )
  }

  // Observable streams
  update$ = this.update.asObservable();
  // Service message commands

  constructor(
      private api: ApiService,
      public notification: NotificationService,
      private taskCountService: TaskCountService
  ) {
    timer(0, 5000).subscribe((res: any) => {
      this.pending_tasks
        .pipe(
          mergeMap(job => {
            return this.handle_retry_task_call(job);
          })
        ).pipe(takeUntil(this.unSubscribe)).subscribe((res: any) => {
          this.task_retry = true;
          this.update_task(res)
        })
    })
  }

  handle_retry_task_call(job) {
    if (!this.task_retry) {
      return this.empty_obs;
    }
    this.task_retry = false;
    return this.api.task.detail(job);
  }

  update_task(task_result) {
    const task = this.tasks[task_result.id];
    if (task.update(task_result)) {
      // if this task has options defined for this status
      let options = task[task_result.status];
      if ('callback' in options) {
        const result = options.callback(task_result);
        if (result !== undefined) {
          options = result
        }
      }
      if ('message' in options) {
        this.notification.add_notification(options.message, null, 6000)
      }
      if (options.update) {
        this.update.next(task_result);
      }
      if (options.final || ['finished', 'failed'].indexOf(task_result.status) > -1) {
        task.complete(task_result);
        this.current_tasks--;
        this.taskCountService.getTaskCount.next(this.current_tasks);
      }
    } else {
      // if the task has no status options defined we only show a message if the task fails or completes
      if (['finished', 'failed'].indexOf(task_result.status) > -1) {
        this.notification.add_notification('Task ' + task_result.id + ' ' + task_result.status, null, 6000);
        if (task_result.status=='failed'){
          console.error(task_result.result)
        }
        task.complete(task_result);
        this.current_tasks--;
        this.taskCountService.getTaskCount.next(this.current_tasks);
      }
    }
  }


  add_task(task_id: string, options = {}) {
    const message = ('message' in options) ? options['message'] : 'New task ' + task_id;
    this.notification.add_notification(message);
    this.tasks[task_id] = Object.assign(new TaskStatus(), options);
    this.current_tasks++;
    this.taskCountService.getTaskCount.next(this.current_tasks);
  }

  ngOnDestroy() {
    this.unSubscribe.next(1);
    this.unSubscribe.complete();
  }
}
