import { Injectable } from '@angular/core';
import { SignalingMessage } from 'src/app/models/signaling-message/signaling-message';
import { DataChannelService } from '../data-channel/data-channel.service';
import { Subscription } from 'rxjs';
import { AmplifyService } from '../amplify/amplify.service';
import { ConnectionState } from '@aws-amplify/pubsub';

type MessageCallback = (message: any) => Promise<void>;

@Injectable({
  providedIn: 'root'
})
export class SignalingService {

  counts: { [key: string]: number } = {};
  onMessage: MessageCallback;
  roomId: string;
  senderId: string;
  subscription: Subscription;

  constructor(
    private dataChannel: DataChannelService,
    private appsync: AmplifyService,
  ) {}

  public async connect(roomId: string, senderId: string, onMessage: MessageCallback) {
    this.roomId = roomId;
    this.senderId = senderId;
    this.onMessage = onMessage;
    await this.setupOnMessage();
    this.appsync.setConnectionCallback((state) => {
      if (state === ConnectionState.ConnectedPendingKeepAlive) {
        this.subscription.unsubscribe();
        this.subscription = null;
      }
      if (state === ConnectionState.Disconnected) {
        this.setupOnMessage();
      }
    });
  }

  public reconnect() {
    if (this.subscription) {
      this.subscription.unsubscribe();
      this.subscription = null;
    }
  }

  public async disconnect() {
    this.appsync.setConnectionCallback(null);
    if (this.onMessage) {
      this.onMessage = null;
    }
    if (this.subscription) {
      this.subscription.unsubscribe();
      this.subscription = null;
    }
    this.roomId = null;
    this.senderId = null;
    this.counts = {};
  }

  public async send(msg: any, uid?: string) {
    if (!uid) {
      uid = null;
    }

    this.createCount(msg.to);
    this.counts[msg.to]++;

    await this.processMessage(new SignalingMessage(uid, msg));
  }

  private createCount(id: string) {
    if (!this.counts[id]) {
      this.counts[id] = 0;
    }
  }

  private async setupOnMessage() {
    if (this.subscription) {
      this.subscription.unsubscribe();
    }
    let isReady = false;
    let readyCount = 0;
    let resolveReady: (val?: any) => void;
    let send: NodeJS.Timeout;
    const ready = new Promise((resolve) => {
      resolveReady = resolve;
    });
    this.subscription = await this.appsync.subscribe('signaling' + this.roomId, async (data) => {
      try {
        if (data) {
          const messages: any[] = JSON.parse(data.data);
          if (messages[0] && messages[0].to === this.senderId && messages[0].sender === this.senderId && messages[0].message === 'ready') {
            readyCount++;
            if (!isReady && readyCount > 3) {
              isReady = true;
              clearInterval(send);
              send = null;
              resolveReady();
            }
          } else {
            for (const message of messages) {
              await this.onMessage(message);
            }
          }
        }
      } catch (err) {
        console.error('SignalingService: Invalid message', data, err.message);
      }
    }, (err) => {
      console.error('SignalingService: Invalid message', err);
    });
    send = setInterval(() => {
      this.appsync.publish('signaling' + this.roomId, JSON.stringify([{ to: this.senderId, sender: this.senderId, message: 'ready' }]));
    }, 250);
    await ready;
  }

  private async processMessage(message: SignalingMessage) {
    if (this.dataChannel.dcs[message.msg.to]?.readyState === 'open' && !message.msg.ice) {
      await this.dataChannel.send(message.msg.to, 'signaling', JSON.stringify(message.msg));
    } else {
      await this.appsync.publish('signaling' + this.roomId, JSON.stringify([message.msg]));
    }
  }
}
