// Copyright 2020-2024 Luminary Cloud, Inc. All Rights Reserved.

import { Message } from '@bufbuild/protobuf';
import {
  CallOptions,
  Code,
  ConnectError,
  Interceptor,
  PromiseClient,
  StreamRequest,
  UnaryRequest,
  createPromiseClient,
} from '@connectrpc/connect';
import { createGrpcWebTransport } from '@connectrpc/connect-web';

import { GeometryService } from '../proto/api/v0/luminarycloud/geometry/geometry_connect';
import * as basepb from '../proto/base/base_pb';
import { Frontend } from '../proto/frontend/frontend_connect';
import { ServerInfo } from '../proto/frontend/serverinfo/serverinfo_connect';
import { SCHEMA_VERSION, SCHEMA_VERSION_HEADER_NAME } from '../schemaVersion';

import { AuthState } from './AuthInfoCallback';
import { getAuthInfoV2, setAuthInfoV2 } from './AuthInfoCallbackV2';
import * as RuntimeParams from './RuntimeParams';
import { getSessionStorageData } from './browserStorage';
import { ERRORS } from './errors';
import * as jwt from './jwt';
import { Logger } from './observability/logs';
import { isProjectAccessDeniedError, isProjectDeletedError } from './projectUtilsErrors';
import { debugInterceptors } from './rpcDebugging';
import sleep from './sleep';
import * as status from './status';
import { addRpcError } from './transientNotification';

const logger = new Logger('rpc');

// We try to reload the browser tab when we get an incompatible client message from the backend.
// However, it seems that this process is not reliable for some clients, so we limit the number of
// refreshes. We use the local storage to store the number of reloads that have happened.
const RELOAD_COUNT_LOCAL_STORAGE_KEY = 'reloadCountIncompatibleFrontend';
const MAX_RELOAD_COUNT = 12;
const maxReloadStorage = sessionStorage;

// Interceptor for the rpc transport to handle auth and version headers.
const authInterceptor: Interceptor = (next) => (
  (req: UnaryRequest | StreamRequest) => {
    const token = localStorage.getItem(jwt.sessionKey) || '';
    if (token) {
      req.header.set('Authorization', `Bearer ${token}`);
    }
    req.header.set(SCHEMA_VERSION_HEADER_NAME, SCHEMA_VERSION);
    return next(req);
  }
);

const interceptors = [
  authInterceptor,
  ...debugInterceptors,
];

// grpc-web transport for the rpc client.
const transport = createGrpcWebTransport({
  baseUrl: RuntimeParams.analyzerServer,
  useBinaryFormat: true,
  credentials: 'include',
  interceptors,
});

// Same but for the geometry services.
const transportGeometry = createGrpcWebTransport({
  baseUrl: RuntimeParams.geometryServer,
  useBinaryFormat: true,
  credentials: 'include',
  interceptors,
});

/**
 * Singleton RPC client for the frontend services
 */
export const client = createPromiseClient(Frontend, transport);

/**
 * Singleton RPC client for the frontend.serverinfo service
 */
export const clientServerInfo = createPromiseClient(ServerInfo, transport);

/**
 * Singleton RPC client for the geometry services
 */
export const clientGeometry = RuntimeParams.geometryServer ?
  createPromiseClient(GeometryService, transportGeometry) : null;

/**
 * Report the # of milliseconds to sleep after failing an RPC for n consecutive times.
 *
 * The value is min(5000, 1000 * (1.2**n))
 * @param value
 * @returns
 */
function backoffMs(value: number): number {
  const minBackoffMs = 1000;
  let mult = 1.2 ** value;
  if (mult > 5.0) {
    mult = 5.0;
  }
  return minBackoffMs * mult;
}

const isAuthError = (err: ConnectError) => err.code === Code.Unauthenticated;

// Resets the reload counter in localStorage for incompatible frontend client errors. This must be
// reset each time we receive an OK response from the server.
function restartIncompatibleFrontend(): void {
  maxReloadStorage.setItem(RELOAD_COUNT_LOCAL_STORAGE_KEY, '0');
}

// Checks if the RPC error is related to the frontend schema version being incompatible with the
// backend. If that is the case, it reloads the window to try to force a refetch of the latest
// bundle.
function handleIncompatibleFrontend(err: ConnectError): void {
  const payload = status.getPayload(err);
  const errContainsIncompatible = err.message.includes('Incompatible frontend client version');
  if (errContainsIncompatible ||
    (payload?.subcode === basepb.Subcode.INCOMPATIBLE_CLIENT)) {
    // Clear the browser cache to try to force the browser to refresh the app.bundle.js. when
    // running the frontend with localhost. When using the product with a lc-environment, the
    // pageversion query parameter in index.html changes when the jobmaster image is updated, so the
    // browser cache should get automatically invalidated.
    const hostnameRegexp = new RegExp(/^.*\.luminarycloud\.com$/);
    const clearCache = !hostnameRegexp.test(window.location.hostname);
    if (clearCache) {
      caches.keys().then((names) => {
        names.forEach((name) => {
          caches.delete(name).catch(
            (error: Error) => console.error(`Browser cache clear failed ${error}`),
          );
        });
      }).catch((error: Error) => {
        console.error(`Browser cache clear failed ${error}`);
      });
    }
    // Increment the reload counter in localStorage. If we surpass the limit throw an error and ask
    // the client to refresh the page himself.
    const reloadCount = getSessionStorageData(RELOAD_COUNT_LOCAL_STORAGE_KEY, 0);
    if (reloadCount > MAX_RELOAD_COUNT) {
      addRpcError(`Your client version is incompatible with the server.
        Please refresh the page.`, new Error());
      throw Error(`Your client version is incompatible with the server.
        Please refresh the page.`);
    }
    maxReloadStorage.setItem(RELOAD_COUNT_LOCAL_STORAGE_KEY, String(reloadCount + 1));
    window.location.reload();
  }
}

// Returns true if the page will be refreshed, to suppress error warnings before refresh.
function maybeHandleAuthError(err: ConnectError): boolean {
  if (err.code === Code.Unauthenticated) {
    const payload = status.getPayload(err);
    if (payload?.subcode === basepb.Subcode.USER_NOT_FOUND) {
      if (getAuthInfoV2().authState !== AuthState.AUTHENTICATION_PENDING_REGISTRATION &&
        getAuthInfoV2().authState !== AuthState.AUTHENTICATION_PENDING) {
        setAuthInfoV2({
          authState: AuthState.USER_NOT_FOUND,
          jwt: null,
        });
      }
    } else {
      // Session token validation status. Force login.
      logger.warn(`Session expired: ${status.stringifyError(err)}`);
      jwt.clearSession();
      setAuthInfoV2({
        authState: AuthState.UNAUTHENTICATED,
        jwt: null,
      });
    }
    throw err;
  }
  if (err.message.includes('Token is expired')) {
    jwt.clearSession();
    setAuthInfoV2({
      authState: AuthState.UNAUTHENTICATED,
      jwt: null,
    });
    return true;
  }
  return false;
}

/** Makes rpc calls to the analyzer server. Retries on retriable errors.  The "call" argument must
    be one of the RPC stub methods defined inside the client object (see above); for example,
    client.newProject, and "request" must be a request object for that stub method. It either
    returns a non-null RPC response or throws an error.

    Optional args {request,reply}ToString are used to produce a human-readable string for a request
    or a reply object. They default to JSON.stringify. The caller should provide its own stringer
    when the request/reply is particularly large.
*/
export async function callRetryWithClient<Request extends Message, Reply extends Message>(
  clientIn: PromiseClient<typeof Frontend> | PromiseClient<typeof GeometryService>,
  label: string,
  call: (req: Request) => Promise<Reply>,
  req: Request,
  catchAuthError: boolean = true,
  toastOnError: boolean = true,
): Promise<Reply> {
  const boundCall = call.bind(clientIn);
  const maxBackoffMs = 5000;
  let backoffMsVar = 1000;
  let nRetries = 0;

  let hadError = false;
  for (; ;) {
    try {
      // skip auth checks for login related rpc
      if (
        getAuthInfoV2().authState === AuthState.UNAUTHENTICATED &&
        label !== 'ResetPassword' &&
        label !== 'SetPassword' &&
        label !== 'UseTicket' &&
        label !== 'AcceptTermsV2' &&
        label !== 'ResendActivationEmail' &&
        label !== 'LoginToken' &&
        label !== 'LoginCodeToken' &&
        label !== 'RecoveryLogin'
      ) {
        throw status.newGrpcError(
          Code.Unavailable,
          'User not yet unauthenticated',
          basepb.Subcode.UNKNOWN,
          [],
        );
      }
      return await boundCall(req);
    } catch (err) {
      hadError = true;
      const grpcErr = status.getGrpcError(err);
      if (!grpcErr) {
        addRpcError(`Error in ${label}: non-grpc error`, err);
        throw err;
      }
      logger.warn(`${label}: grpc error: ${grpcErr.code}, ${grpcErr.message}`);
      handleIncompatibleFrontend(err);
      if (!catchAuthError && isAuthError(grpcErr)) {
        throw err;
      }
      const logoutUser = maybeHandleAuthError(grpcErr);
      if (!status.shouldRetry(grpcErr) && !logoutUser) {
        if (toastOnError) {
          const projectDenied = (
            isProjectAccessDeniedError(grpcErr.message) ||
            isProjectDeletedError(grpcErr.message)
          );
          if (!projectDenied && !['LoginToken', 'RecoveryLogin'].includes(label)) {
            if (grpcErr.code === Code.PermissionDenied) {
              const payload = status.getPayload(err);
              if (payload?.subcode === basepb.Subcode.EMAIL_DOMAIN_NOT_IN_ALLOWED_DOMAINS) {
                addRpcError(
                  "Unable to add the user because the user's email domain " +
                  'is not in the list of allowed domains. ' +
                  'Click on the Security tab under ' +
                  'Administration to add the domain.',
                  err,
                );
              } else {
                addRpcError('Permission denied', err);
              }
            } else if (!Object.prototype.hasOwnProperty.call(ERRORS, label)) {
              addRpcError(`${label} failed`, err);
            }
          }
        }
        throw err;
      }
      if (backoffMsVar < maxBackoffMs) {
        backoffMsVar *= 1.2;
      }
      nRetries += 1;
      logger.warn(
        `${label}: retriable error: ${status.stringifyError(grpcErr)}; ` +
        `sleeping = ${backoffMsVar} retries = ${nRetries} `,
      );
      await sleep(backoffMsVar);
      logger.warn(`${label}: retrying`);
    }
    if (!hadError) {
      restartIncompatibleFrontend();
    }
  }
}

// NOTE: this function uses the default frontend client.
export async function callRetry<Request extends Message, Reply extends Message>(
  label: string,
  call: (req: Request) => Promise<Reply>,
  req: Request,
  catchAuthError: boolean = true,
  toastOnError: boolean = true,
): Promise<Reply> {
  return callRetryWithClient(
    client,
    label,
    call,
    req,
    catchAuthError,
    toastOnError,
  );
}

const STREAM_RPC_SHUTDOWN_DELAY_MS = 2000;

class StreamingRpcRunner<Request extends Message, Reply extends Message> {
  // Ongoing streaming RPC.
  private stream: AsyncIterable<Reply> | null = null;

  private abortController: AbortController | null = null;

  private nRetries = 0;
  private stopped = false;
  private restartTimer: ReturnType<typeof setTimeout> | null = null;
  // This flag is set to true when the server closes the streaming RPC without an error.
  public serverClosed = false;

  private readonly call: (
    request: Request,
    options?: CallOptions
  ) => AsyncIterable<Reply>;

  /**
   * The arg "call" is a server-side streaming Frontend method, e.g., client.GetProject. onRequest
   * is a function that generates an RPC payload. It will be called just before issuing a call.
   * The onReply function is called on every reply from the server.  The caller must run stop() to
   * stop the ongoing RPC.
   *
   * The label is used only for logging .
   */
  constructor(
    private readonly label: string,
    call: (
      req: Request,
      options?: CallOptions
    ) => AsyncIterable<Reply>,
    private readonly onRequest: () => Request,
    private readonly onReply: (reply: Reply) => void,
    private readonly onError: (err: ConnectError) => void,
    private readonly onRestart: (() => void) | undefined,
    clientIn: PromiseClient<typeof Frontend> | PromiseClient<typeof GeometryService>,
  ) {
    this.call = call.bind(clientIn);
    this.startRpc().catch((err) => {
      logger.error(`Failed to start RPC label=${this.label} err=${err}`);
    });
  }

  /**
  * Stop the ongoing RPC. It is typically called in the React.useEffect unmount handler. After stop,
  * no other method may be called.
  */
  stop(): void {
    logger.debug(`stop ${this.label}`);
    this.stopped = true;
    if (this.stream !== null) {
      if (this.abortController) {
        this.abortController.abort();
        this.abortController = null;
      }
    }
    if (this.restartTimer) {
      clearTimeout(this.restartTimer);
    }
  }

  /**
   * Called on an RPC error. It cancels the current RPC, then schedules to start a new one after a
   * backoff.
   */
  private scheduleRestart(stream: AsyncIterable<Reply>): void {
    if (this.abortController) {
      this.abortController.abort();
      this.abortController = null;
    }
    if (this.stream === stream && !this.stopped && !this.restartTimer) {
      this.onRestart?.();
      this.stream = null;
      const backoff = backoffMs(this.nRetries);
      logger.debug(
        `restarting ${this.label} after` +
        ` ${backoff}ms, nretries=${this.nRetries} `,
      );
      this.restartTimer = setTimeout(this.startRpc.bind(this), backoff);
      this.nRetries += 1;
    }
  }

  private async startRpc(): Promise<void> {
    if (this.stream || this.stopped) {
      throw Error(`${this.label} invalid: stream=${this.stream} stopped=${this.stopped}`);
    }
    // Create a new AbortController so that we cancel the streaming RPC.
    this.abortController = new AbortController();
    logger.debug(`start ${this.label}`);
    this.restartTimer = null;
    const abortSignal = this.abortController.signal;
    const stream = this.call(this.onRequest(), {
      signal: abortSignal,
    });
    this.stream = stream;
    try {
      // eslint-disable-next-line no-restricted-syntax
      for await (const reply of stream) {
        this.nRetries = 0;
        restartIncompatibleFrontend();
        // Discard messages if we received them but the RPC was cancelled
        if (abortSignal.aborted) {
          break;
        }
        logger.debug(`${this.label} got data`);
        this.onReply(reply);
      }
      logger.debug(`${this.label} end: nretries = ${this.nRetries} `);
      this.serverClosed = true;
    } catch (err) {
      // Handle the error here
      const grpcErr = status.getGrpcError(err);
      if (!grpcErr) {
        throw Error(`could not parse grpc error ${status.stringifyError(err)}`);
      }
      if (grpcErr.code === Code.Canceled && this.stopped) {
        logger.debug(`${this.label} cancelled`);
        return;
      }
      logger.warn(`${this.label} code: ${grpcErr.code} error: ${status.stringifyError(grpcErr)} `);
      handleIncompatibleFrontend(grpcErr);
      const logoutUser = maybeHandleAuthError(grpcErr);
      if (!status.shouldRetry(grpcErr) && !logoutUser) {
        this.onError(grpcErr);
        this.serverClosed = true;
      } else {
        this.scheduleRestart(stream);
      }
    }
  }
}

//
// Manages a shared pool of streaming RPCs.
//
export class StreamingRpcPool<Request extends Message, Reply extends Message> {
  private readonly activeRpcs: {
    [key: string]: {
      nRef: number;
      shuttingDown: boolean;
      shutdownTimer: ReturnType<typeof setTimeout> | null;
      rpc: StreamingRpcRunner<Request, Reply>;
    }
  } = {};

  /**
   * @param rpcName is the grpc method name, and call is matching method function
   * @param call is the matching method function in the "client".
   * @param restartable whether the streaming RPC may be be restarted by the client if the server
   * closes the connection.
   */
  constructor(
    private readonly rpcName: string,
    private readonly call: (
      request: Request,
      options?: CallOptions
    ) => AsyncIterable<Reply>,
    private readonly restartable: boolean = false,
  ) {
  }

  // Starts a new streaming RPC for the given key.  Arg key identifies the resource read by the
  // request, for example, GetProject rpc should pass the project ID as the key. onRequest is called
  // every time a new RPC starts. It should generate the RPC request.  onReply is called every time
  // an RPC reply is received.  onError is called when the streaming RPC hits an error, except for
  // authentication errors and retriable errors (according to status.shouldRetry).
  //
  // If start is called for the same "key" multiple times, only the 1st call takes the effect.  The
  // caller must ensure that if two calls have the same key, then onRequest, onReply and onError are
  // also the same.  start() returns a cancel callback, which the caller must run when the component
  // unmounts. This methods keeps a refcount of the number of outstanding start calls; once the
  // count drops to zero, the streaming RPC will be cancelled.
  //
  // onStop will only be called, when the RPC is removed from the list of active RPCs, i.e. when the
  // RPC ref count goes to zero and the shutdown delay has passed. onStart will only be called when
  // the RPC is added to the list of active RPCs. onRestart will be called when the streaming RPC is
  // restarted due to a retriable error. shutDownDelayMs is the delay in milliseconds before the RPC
  // is removed from the list of active RPCs after the ref counter for the RPC goes to zero.
  start(
    key: string,
    onRequest: () => Request,
    onReply: (reply: Reply) => void,
    onError: (err: ConnectError) => void,
    clientIn: PromiseClient<typeof Frontend> | PromiseClient<typeof GeometryService> = client,
    onStop: (() => void) | undefined = undefined,
    onStart: (() => void) | undefined = undefined,
    onRestart: (() => void) | undefined = undefined,
    shutDownDelayMs: number = STREAM_RPC_SHUTDOWN_DELAY_MS,
  ): () => void {
    let rpc = this.activeRpcs[key];
    // If the RPC is restartable and the server has closed the connection, we need to reset the RPC.
    if (this.restartable && rpc?.rpc.serverClosed) {
      delete this.activeRpcs[key];
      rpc = this.activeRpcs[key];
    }
    if (!rpc) {
      rpc = {
        rpc: new StreamingRpcRunner<Request, Reply>(
          `${this.rpcName}/${key}`,
          this.call,
          onRequest,
          onReply,
          onError,
          onRestart,
          clientIn,
        ),
        shuttingDown: false,
        shutdownTimer: null,
        nRef: 0,
      };
      this.activeRpcs[key] = rpc;
      onStart?.();
    }
    if (rpc.nRef === 0 && rpc.shuttingDown) {
      rpc.shuttingDown = false;
      if (rpc.shutdownTimer) {
        clearTimeout(rpc.shutdownTimer);
        rpc.shutdownTimer = null;
      }
    }
    rpc.nRef += 1;
    return () => {
      logger.debug(`Cancel RPC key=${key} nref=${rpc.nRef}`);
      rpc.nRef -= 1;
      if (rpc.nRef < 0 || rpc.shuttingDown) {
        throw Error(`unbalanced unref nref = ${rpc.nRef} ` +
          `shuttingDown = ${rpc.shuttingDown} ` +
          `timer = ${rpc.shutdownTimer}`);
      }
      if (rpc.nRef === 0) {
        // We track if we're shutting down separately from the timer, if shutdown delay is 0
        // we won't have the timer
        rpc.shuttingDown = true;

        const shutdownRpc = () => {
          // Check if the RPC is not in use and if it is the active RPC. If the RPC can be
          // restarted, we cannot throw since we restart RPCs by recreating the map entry.
          if (!this.restartable && (rpc.nRef !== 0 || this.activeRpcs[key] !== rpc)) {
            throw Error(`rpc ${key} modified in the background, nref=${rpc.nRef}`);
          }
          logger.debug(`Stopping RPC ${key}`);
          rpc.rpc.stop();
          if (this.activeRpcs[key] === rpc) {
            delete this.activeRpcs[key];
          }
          onStop?.();
        };
        if (shutDownDelayMs > 0) {
          rpc.shutdownTimer = setTimeout(shutdownRpc, shutDownDelayMs);
        } else {
          // If the delay is 0 we need to call shutdown immediately, otherwise
          // the call is still delayed til the next event loop when timeouts run
          shutdownRpc();
        }
      }
    };
  }
}
