import {
  HttpClient,
  HttpDownloadProgressEvent,
  HttpErrorResponse,
  HttpEvent,
  HttpEventType,
  HttpResponse,
} from '@angular/common/http';
import { Injectable } from '@angular/core';
import {
  delay,
  Observable,
  repeatWhen,
  retryWhen,
  Subscriber,
  Subscription,
  takeWhile,
  tap,
} from 'rxjs';
import { SseErrorEvent } from './sse-error-event.interface';
import { defaultSseOptions, SseOptions } from './sse-options.interface';

@Injectable({ providedIn: 'root' })
export class SseClient {
  private static readonly SEPARATOR = ':';

  private progress = 0;
  private chunk = '';

  private sseOptions: SseOptions;

  constructor(private http: HttpClient) {}

  public stream(url: string, options: SseOptions): Observable<Event> {
    this.sseOptions = Object.assign({}, defaultSseOptions, options);
    return new Observable<Event>(observer => {
      const subscription = this.subscribeStreamRequest(url, this.sseOptions, observer);
      return () => subscription.unsubscribe();
    });
  }

  private subscribeStreamRequest(
    url: string,
    options: SseOptions,
    observer: Subscriber<Event>
  ): Subscription {
    return this.http
      .get<string>(url, <any>{
        observe: 'events',
        responseType: 'text',
        reportProgress: true,
      })
      .pipe(
        repeatWhen(completed =>
          this.repeatWhen(completed, options.keepAlive, options.reconnectionDelay)
        )
      )
      .pipe(
        retryWhen(error =>
          this.retryWhen(error, options.keepAlive, options.reconnectionDelay, observer)
        )
      )
      .subscribe(event => this.parseStreamEvent(event, observer));
  }

  private repeatWhen(
    completed: Observable<any>,
    keepAlive: boolean,
    reconnectionDelay: number
  ): Observable<any> {
    return completed.pipe(takeWhile(() => keepAlive)).pipe(delay(reconnectionDelay));
  }

  private retryWhen(
    attempts: Observable<any>,
    keepAlive: boolean,
    reconnectionDelay: number,
    observer: Subscriber<Event>
  ): Observable<any> {
    return attempts
      .pipe(tap(error => this.threatRequestError(error, observer)))
      .pipe(takeWhile(() => keepAlive))
      .pipe(delay(reconnectionDelay));
  }

  private threatRequestError(event: HttpErrorResponse, observer: Subscriber<Event>): void {
    this.dispatchStreamData(this.errorEvent(event), observer);

    if (event.status === undefined || event.status === null) {
      observer.error(event);
    }
  }

  private parseStreamEvent(event: HttpEvent<string>, observer: Subscriber<Event>): void {
    if (event.type === HttpEventType.Sent) {
      this.progress = 0;
      return;
    }

    if (event.type === HttpEventType.DownloadProgress) {
      this.onStreamProgress((event as HttpDownloadProgressEvent).partialText as string, observer);
      return;
    }

    if (event.type === HttpEventType.Response) {
      this.onStreamCompleted(event as HttpResponse<string>, observer);
      return;
    }
  }

  private onStreamProgress(data: string, observer: Subscriber<Event>): void {
    data = data?.substring(this.progress) ?? '';
    this.progress += data.length;
    data.split(/(\r\n|\r|\n){2}/g).forEach(part => this.parseEventData(part, observer));
  }

  private onStreamCompleted(response: HttpResponse<string>, observer: Subscriber<Event>): void {
    this.onStreamProgress(response.body as string, observer);
    this.dispatchStreamData(this.parseEventChunk(this.chunk), observer);

    this.chunk = '';
    this.progress = 0;

    this.dispatchStreamData(this.errorEvent(), observer);
  }

  private parseEventData(part: string, observer: Subscriber<Event>) {
    if (part.trim().length === 0) {
      this.dispatchStreamData(this.parseEventChunk(this.chunk), observer);
      this.chunk = '';
    } else {
      this.chunk += part;
    }
  }

  private parseEventChunk(chunk: string): MessageEvent | undefined {
    if (!chunk || chunk.length === 0) return;

    const chunkEvent: ChunkEvent = { id: undefined, data: '', event: 'message' };
    chunk.split(/\n|\r\n|\r/).forEach(line => this.parseChunkLine(line.trim(), chunkEvent));

    return this.messageEvent(chunkEvent.event, {
      lastEventId: chunkEvent.id,
      data: chunkEvent.data,
    });
  }

  private parseChunkLine(line: string, event: ChunkEvent) {
    const index = line.indexOf(SseClient.SEPARATOR);
    if (index <= 0) return;

    const field = line.substring(0, index);
    if (Object.keys(event).findIndex((key: string) => key === field) === -1) return;

    let data = line.substring(index + 1).trim();
    if (field === 'data') data = event.data + data;

    event[field] = data;
  }

  private dispatchStreamData(event: Event | undefined, observer: Subscriber<Event>): void {
    if (!this.validEvent(event)) return;
    observer.next(event);
  }

  private validEvent(event: Event | undefined): boolean {
    if (!event) return false;
    if (
      event.type !== 'error' &&
      (!(event as MessageEvent).data || !(event as MessageEvent).data.length)
    )
      return false;
    return true;
  }

  private messageEvent(type: string, options: MessageEventInit): MessageEvent {
    return new MessageEvent(type, options);
  }

  private errorEvent(error?: any): Event {
    let eventData: Partial<SseErrorEvent> | undefined;

    if (error instanceof HttpErrorResponse) {
      eventData = {
        error,
        status: error.status,
        statusText: error.statusText,
        message: error.message,
      };
    }

    return new ErrorEvent('error', eventData);
  }
}

type ChunkEvent = { id: string | undefined; data: string; event: 'message'; [key: string]: any };
