import { Injectable } from '@angular/core';
import { AppConfig } from '@app/app.config';
import {
  catchError,
  filter,
  finalize,
  firstValueFrom,
  Observable,
  retry,
  Subject,
  Subscription,
  switchMap,
  take,
  throwError,
  timeout,
  timer,
} from 'rxjs';
import { SseClient } from '../sse/sse-client.service';
import { ChannelsService } from './channels.service';
import { EventType } from './event-type.enum';

@Injectable({ providedIn: 'root' })
export class EventsService {
  private subscription: Subscription;

  private readonly onmessage$ = new Subject<MessageEvent>();

  private readonly channels = new Map<string, Subject<any>>();

  constructor(private sseClient: SseClient, private readonly channelsService: ChannelsService) {}

  connect(): void {
    const url = `${AppConfig.API_URL_NOTIFICATION}/eventos/source`;
    this.subscription?.unsubscribe();
    this.subscription = this.sseClient
      .stream(url, { keepAlive: true, reconnectionDelay: 5_000 })
      .subscribe(event => {
        if (this.isMessageEvent(event)) {
          this.onmessage$.next(event);
          this.channels.get(event.lastEventId)?.next(JSON.parse(event.data));
          return;
        }

        const errorEvent = event as ErrorEvent;
        if (errorEvent.error?.status > 0) {
          console.error(errorEvent.message, errorEvent.error);
        }
      });
  }

  disconnect(): void {
    this.subscription?.unsubscribe();
    this.subscription = undefined;
  }

  listen<T extends object = any>(type: EventType): Observable<T> {
    return new Observable<T>(observer => {
      const subscription = this.onmessage$
        .pipe(filter(event => event.type === type))
        .subscribe(event => {
          observer.next(JSON.parse(event.data) as T);
        });

      return () => subscription.unsubscribe();
    });
  }

  /**
   * Escutar eventos de um canal.
   * - Adiciona o usuário logado ao canal e escuta os eventos do canal.
   * - Caso não exista um canal com o ID, será criado um novo.
   * @param channelId UUID do canal.
   */
  channel<T extends object = any>(channelId: string): Observable<T> {
    const channel = this.channels.get(channelId)?.asObservable() ?? this.createChannel(channelId);
    return channel.pipe(finalize(() => queueMicrotask(() => this.channelTeardown(channelId))));
  }

  private createChannel<T extends object = any>(channelId: string): Observable<T> {
    this.channels.set(channelId, new Subject<T>());

    return this.channelsService.subscribe(channelId).pipe(
      retry({
        count: 3,
        delay: (_, retryAttempt: number) => timer(retryAttempt * 2_000),
        resetOnSuccess: true,
      }),
      catchError(reason => {
        this.channels.delete(channelId);
        return throwError(() => reason);
      }),
      switchMap(() => this.channels.get(channelId).asObservable())
    );
  }

  private channelTeardown(channelId: string): void {
    const channel = this.channels.get(channelId);
    if (channel?.observed) return;

    channel?.complete();
    this.channels.delete(channelId);

    firstValueFrom(
      this.channelsService.unsubscribe(channelId).pipe(
        take(1),
        timeout(15_000),
        catchError(() => [])
      )
    );
  }

  private isMessageEvent(event: Event): event is MessageEvent {
    return event.type !== 'error';
  }
}
