// EXTERNAL
import {EventSourcePolyfill} from 'event-source-polyfill';
//
// INTERNAL
import authenticationService, {AuthorizationHeader} from '@services/authentication.service';
import {logger} from '@services/logger.service';
import {Observable, Subscriber, timer} from 'rxjs';
import {mergeMap, retryWhen} from 'rxjs/operators';
import {notNullOrUndefined} from "shared-frontend/dist/utils/functional.utils";

type EventSourceListener = {
    eventType: string;
    eventHandler: (data: any) => any;
};

interface SseEvent {
    eventType: string;
    data: any;
}

export default class EventSource {

    public static createObservable<T extends SseEvent>(url: string, eventTypes: string[]): Observable<T> {
        let lastEventId = null;

        return new Observable<T>((subscriber: Subscriber<SseEvent>) => {

            const eventTypeListeners = eventTypes
                .map((eventType: string) => {
                    // logger.debug(`[eventSource.domain] - registering event listener on ${url} for eventType ${eventType}`);
                    return {
                        eventType,
                        eventHandler: (event: any) => {
                            if(notNullOrUndefined(event.lastEventId)) lastEventId = event.lastEventId;
                            EventSource.onEvent(event, url, eventType, (data => subscriber.next({eventType: eventType, data: data})))
                        }
                    }
                });

            const listeners: EventSourceListener[] = eventTypeListeners
                .concat(
                    {
                        eventType: 'error', eventHandler: event => {
                            if (event.readyState === EventSourcePolyfill.CLOSED) {
                                logger.debug(`[eventSource.domain] - SSE is closed for url: : ${url}`, event);
                                subscriber.complete();
                            } else {
                                logger.warn(`[eventSource.domain] - Error occurred in SSE for url: : ${url}`, event);
                                subscriber.error(event);
                            }
                        }
                    },
                    {
                        eventType: 'message', eventHandler: event => {
                            logger.warn(`[eventSource.domain] - received a message with an unknown event type on URL: ${url}`, event);
                        }
                    },
                );


            const requestURL = notNullOrUndefined(lastEventId)? `${url}${url.indexOf("?") === -1 ? "?" : "&"}lastEventId=${encodeURIComponent(lastEventId)}`  : url
            const ssePromise = EventSource.openConnection<T>(requestURL, listeners);
            return () => ssePromise.then((sse) => EventSource.closeConnection(sse, listeners, url))
        }).pipe(
            // retry is needed to try again when access token is expired
            retryWhen((errors) =>
                errors
                    .pipe(
                        mergeMap((e, cnt) => {
                                const waitTime = Math.max(cnt, 30)
                                if (e.status === 401) {
                                    logger.info(`[eventSource.domain] - access token is expired, restarting the SSE in ${waitTime}s: ${url}`);
                                    return timer(waitTime * 1000)
                                } else if (e.target && e.target.readyState === EventSourcePolyfill.CONNECTING) {
                                    logger.info(`[eventSource.domain] - received error while connecting, restarting the SSE in ${waitTime}s: ${url}`, e);
                                    return timer(waitTime * 1000)
                                } else {
                                    logger.info(`[eventSource.domain] - received unexpected error, restarting the SSE in ${waitTime}s: ${url}`, e);
                                    return timer(waitTime * 1000)
                                    // return throwError(e)
                                }
                            }
                        ))
            )
        )
    };

    private static onEvent(event: any, url: string, eventType: string, onData: (data:any) => void): any {
        try {
            if (event?.data) {
                const data = JSON.parse(event.data);
                logger.info(`[eventSource.domain] - Incoming SSE event on ${url} of type: ${eventType} with id ${event.lastEventId}`, data);
                onData(data)
            } else {
                logger.warn(`[eventSource.domain] - Incoming SSE event on ${url} of type, with NO data: ${eventType}`, event);
            }
        } catch (err) {
            logger.warn(`[eventSource.domain] - eventListener: Error when handling SSE data on ${url}: ${eventType}`, event, err);
        }
    }


    private static async openConnection<T extends SseEvent>(
        url: string,
        listeners: EventSourceListener[]
    ): Promise<EventSourcePolyfill> {
        const token = await EventSource._getAuthorization();
        logger.info(`[eventSource.domain] - Opening SSE connection on url: ${url}`);
        const sse = new EventSourcePolyfill(url, {headers: {...token}});

        listeners.forEach((listener: EventSourceListener) => {
            sse.addEventListener(listener.eventType, listener.eventHandler);
        });

        return sse;
    };

    private static async closeConnection(sse: EventSourcePolyfill, listeners: EventSourceListener[], url: string) {
        logger.info(`[eventSource.domain] - Closing SSE connection for url ${url}`);
        listeners.forEach(listener => {
            sse.removeEventListener(listener.eventType, listener.eventHandler)
        })
        sse.close();
    }

    private static async _getAuthorization(): Promise<AuthorizationHeader> {
        return await authenticationService.getAuthorizationHeader();
    }
}
