import { Injectable } from '@angular/core';
import { Observable, Observer } from 'rxjs';
import { AnonymousSubject } from 'rxjs/internal/Subject';
import { Subject, of } from 'rxjs';
import { catchError, map } from 'rxjs/operators';
import { environment } from 'src/environments/environment';
import { KeycloakService } from 'keycloak-angular';


export interface Message {
  type?: string
  messageId: string
  payload?: any,
  properties?: { [id: string]: string },
  context?: string
}

@Injectable()
export class PulsarService {
  private subject: AnonymousSubject<MessageEvent>;
  public messages: Subject<Message>;

  constructor(private keycloakService: KeycloakService) { }

  public initConnection(topic, subscription, queryParam?: { [id: string]: string }) {
    let param = ""
    queryParam = {...(queryParam?queryParam:{}), ...{token: this.keycloakService.getKeycloakInstance().token}};
    if (queryParam) {
      param = "?"
      let keys = Object.keys(queryParam)
      keys.forEach(key => {
        param += `${key}=${queryParam[key]}&`
      })
      param = param.slice(0, -1)
    }

    this.messages = <Subject<Message>>this.connect(`${environment.pulsarUrl}${topic}/${subscription}${param}`).pipe(
      map(
        (response: MessageEvent): Message => {
          let data = JSON.parse(response.data)
          return data;
        }
      ),
      catchError(err => {
        return of(1)
      })
    );

    return this.messages
  }

  public connect(url): AnonymousSubject<MessageEvent> {
    this.subject = this.create(url);
    return this.subject;
  }

  private create(url): AnonymousSubject<MessageEvent> {
    let ws = new WebSocket(url);
    let observable = new Observable((obs: Observer<MessageEvent>) => {
      ws.onmessage = obs.next.bind(obs);
      ws.onerror = obs.error.bind(obs);
      ws.onclose = obs.complete.bind(obs);
      return ws.close.bind(ws);
    });
    let observer = {
      error: (err) => { console.log("PULSAR ERROR", err) },
      complete: () => { console.log("COMPLETE PULSAR") },
      next: (data: Object) => {
        do{
          if (ws.readyState === WebSocket.OPEN) {
            ws.send(JSON.stringify(data));
          }
        }while(ws.readyState !== WebSocket.OPEN)  
        
      }
    };
    return new AnonymousSubject<MessageEvent>(observer, observable);
  }
}
