import {Deferred} from '@sail/utils';
import RpcChannel from 'src/internal/rpc-channel/channel';
import {createRpcHandshake} from 'src/internal/rpc-handshake/handshake';
import MessageChannel from 'src/internal/transport/MessageChannel';
import fromMessagePort from 'src/internal/transport/fromMessagePort';
import listen from 'src/internal/utils/listen';
import {v4 as uuid} from 'uuid';

import type {Encoder} from 'src/internal/encoder/types';
import type {ChannelConfig, RPCMethods} from 'src/internal/rpc-channel/types';
import type {Transport} from 'src/internal/transport/types';

export type RpcClient<TMethods extends RPCMethods> = Readonly<{
  id: string;
  call: RpcChannel<TMethods>['call'];
  close: RpcChannel<TMethods>['close'];
}>;
export type RpcServer = Readonly<{
  id: string;
  close(): Promise<void>;
}>;

/**
 * Factory function that creates a new RPC client and server. It returns an
 * object with two functions: `initServer` and `initClient`.
 *
 * The `initServer` function creates a new RPC server that listens for new
 * clients on the provided `transport`. Clients from a different `namespace`
 * will be ignored.  The server will create a new endpoint for each client that
 * connects to it.
 *
 * The `initClient` function creates a new RPC client that will try to find a
 * server on the same `namespace`.
 *
 * The `methods` parameter is an object containing the methods that will be
 * exposed from the endpoints created by the server. Due to the nature of RPC,
 * even if these methods are synchronous, they will be called asynchronously
 * and the results will be returned to clients as promises.
 *
 * The provided `transport` is used to perform the initial handshake between
 * the client and the server. After the handshake, the client and the server
 * will communicate using a private `MessageChannel` to avoid polluting the
 * provided `transport` and avoiding conflicts with other messages that may be
 * sent through the same `transport`.
 */
export default function createRpc<TMethods extends RPCMethods>({
  namespace,
  methods,
  encoder,
}: {
  namespace: string;
  methods?: ChannelConfig<TMethods, {}>['methods'];
  encoder?: Encoder;
}): {
  initServer: (options: {transport: Transport}) => RpcServer;
  initClient: (options: {transport: Transport}) => RpcClient<TMethods>;
} {
  const {
    ack,
    synAck,
    isAckEvent,
    isSynAckEvent,
    syn,
    synServer,
    isSynEvent,
    isSynServerEvent,
    fin,
    finServer,
    isFinEvent,
    isFinServerEvent,
  } = createRpcHandshake(namespace);

  return {
    initServer: ({transport}: {transport: Transport}) => {
      const serverId = uuid();
      const endpoints = new Map<string, RpcChannel<TMethods>>();
      let closing: Deferred<void> | null = null;

      // Listen for new client connections.
      const newClientListener = listen(
        transport,
        ({data: event}: MessageEvent<any>) => {
          if (isSynEvent(event)) {
            const {data} = event;

            synAck(transport, {
              clientId: data.clientId,
              serverId,
            });
          }

          if (isAckEvent(event)) {
            const {data} = event;

            if (data.serverId === serverId) {
              const {clientId, port} = data;

              endpoints.set(
                clientId,
                new RpcChannel<TMethods>({
                  name: namespace,
                  transport: fromMessagePort(port),
                  methods,
                  encoder,
                }),
              );

              port.start();
            }
          }
        },
      );

      // Listen for clients that are closing.
      const clientCloseListener = listen(
        transport,
        ({data: event}: MessageEvent<any>) => {
          if (isFinEvent(event)) {
            const {clientId} = event.data;

            if (endpoints.has(clientId)) {
              endpoints.get(clientId)?.close();
              endpoints.delete(clientId);
            }

            // If there are no connected clients, and the server is closing,
            // resolve the close promise.
            if (endpoints.size === 0 && closing != null) {
              clientCloseListener.stop();
              closing?.resolve();
            }
          }
        },
      );

      // Announce that the server is ready to accept new clients.
      synServer(transport, {
        serverId,
      });

      return {
        get id() {
          return serverId;
        },
        close() {
          closing = closing ?? new Deferred<void>();

          // Stop listening for new clients.
          newClientListener.stop();

          // Announce that the server is closing.
          finServer(transport, {serverId});

          // If there are no clients, resolve the close promise.
          if (endpoints.size === 0) {
            clientCloseListener.stop();
            closing?.resolve();
          }

          // Don't close until all clients have disconnected.
          return closing.promise;
        },
      };
    },
    initClient: ({transport}: {transport: Transport}) => {
      const clientId = uuid();
      let client = createClient<TMethods>(namespace, encoder);
      let serverId: string | null = null;

      const serverListener = listen(
        transport,
        ({data: event}: MessageEvent<any>) => {
          if (isSynAckEvent(event)) {
            const {data} = event;

            if (data.clientId === clientId) {
              // Stop listening for server handshake events.
              serverListener.stop();

              serverId = data.serverId;

              ack(
                transport,
                {
                  clientId: data.clientId,
                  serverId: data.serverId,
                  port: client.port,
                },
                // Transfer ownership of the port to the server
                [client.port],
              );
            }
          }

          // If a new server is announced, and we're still looking for a server,
          // send a new syn event to request a new connection.
          if (isSynServerEvent(event)) {
            syn(transport, {
              clientId,
            });
          }
        },
      );

      const serverCloseListener = listen(
        transport,
        ({data: event}: MessageEvent<any>) => {
          if (isFinServerEvent(event)) {
            const {data} = event;

            if (serverId === data.serverId) {
              serverId = null;

              // Disconnect from the server.
              fin(transport, {
                clientId,
              });

              // Create a new client.
              client.endpoint.close();
              client = createClient<TMethods>(namespace, encoder);

              // Look for a new server.
              serverListener.start();
              syn(transport, {
                clientId,
              });
            }
          }
        },
      );

      syn(transport, {
        clientId,
      });

      return {
        get id() {
          return clientId;
        },
        call(method, ...args) {
          return client.endpoint.call(method, ...args);
        },
        close() {
          // Announce that the client is closing.
          fin(transport, {
            clientId,
          });

          // Cleanup
          serverListener.stop();
          serverCloseListener.stop();
          client.endpoint.close();
        },
      };
    },
  };
}

function createClient<TMethods extends RPCMethods>(
  source: string,
  encoder?: Encoder,
) {
  const {port1, port2} = new MessageChannel();
  port1.start();

  return {
    endpoint: new RpcChannel<TMethods>({
      name: source,
      transport: fromMessagePort(port1),
      encoder,
    }),
    port: port2,
  };
}
