import { Injectable } from '@angular/core';
import { API, graphqlOperation } from '@aws-amplify/api';
import { CONNECTION_STATE_CHANGE, ConnectionState } from '@aws-amplify/pubsub';
import { Cache } from '@aws-amplify/cache';
import { Amplify, Hub } from '@aws-amplify/core';
import { Observable } from 'rxjs';
import { environment } from 'src/environments/environment';
import { UserService } from '../user/user.service';

const subscribeDoc = /* GraphQL */ `
  subscription Subscribe($name: String!) {
    subscribe(name: $name) {
      data
      name
    }
  }
`;

const publishDoc = /* GraphQL */ `
  mutation Publish($data: AWSJSON!, $name: String!) {
    publish(data: $data, name: $name) {
      data
      name
    }
  }
`;

@Injectable({
  providedIn: 'root'
})
export class AmplifyService {
  public connectionState: string;
  private connectionCallback: CallableFunction;

  constructor(
    private user: UserService,
  ) {}

  public async configure() {
    Amplify.configure({
      aws_appsync_graphqlEndpoint: environment.aws.endpoint,
      aws_appsync_region: environment.aws.region,
      aws_appsync_authenticationType: environment.aws.authType,
      aws_appsync_apiKey: 'null',
    });
    Hub.listen('api', ({ payload: { event, data } }) => {
      if (event === CONNECTION_STATE_CHANGE) {
        this.connectionState = data.connectionState;
        if (this.connectionCallback) {
          this.connectionCallback(data.connectionState);
        }
      }
    });
  }

  public setConnectionCallback(callback?: CallableFunction) {
    this.connectionCallback = callback;
  }

  public async publish(name, data, waitReady = true) {
    if (waitReady) {
      await this.isReady();
    }
    await this.user.isReady();
    const tokenResult = await this.user.user.getIdTokenResult();
    Cache.setItem('federatedInfo', {
      token: tokenResult.token,
    });

    return await API.graphql(graphqlOperation(publishDoc, { name, data }));
  }

  public async subscribe(name, next, error) {
    await this.user.isReady();
    const tokenResult = await this.user.user.getIdTokenResult();
    Cache.setItem('federatedInfo', {
      token: tokenResult.token,
    });

    const graphql = await API.graphql(graphqlOperation(subscribeDoc, { name })) as Observable<any>;

    return graphql.subscribe({
      next: ({ provider, value }) => {
          next(value.data.subscribe, provider, value);
      },
      error: error || console.log,
    });
  }

  public async isReady(): Promise<void> {
    if (this.connectionState === ConnectionState.Connected) {
      return;
    }

    return new Promise((resolve) => {
      let retry = 0;
      const interval = setInterval(async () => {
        if (this.connectionState === ConnectionState.Connected || retry > 30) {
          clearInterval(interval);
          resolve();
        }
        retry++;
      }, 1000);
    });
  }
}
