import { RpcManager } from '../rpc.manager';
import { filter } from 'rxjs/operators';
import { BehaviorSubject, firstValueFrom, Subject, take } from 'rxjs';
import { DestructManager } from '../destruct.manager';
import { QueryRequest } from '../../../internal-common/src/types/query.types';
import { BatchQueryResultData, QueryResultData } from '../../../internal-common/src/types/socket.types';

interface QueryRequestAndResponseSubject {
  queryRequest: QueryRequest;
  responseSubject: Subject<QueryResultData>;
}

/** @internal */
export class QuerySender {
  /**
   * A collection of query requests awaiting batch dispatch.
   */
  private readonly pendingQueryRequests: Array<QueryRequestAndResponseSubject> = [];

  /**
   * A timeout identifier used to ensure that only the last query in rapid succession triggers the batch send process.
   */
  private pendingQueryBatchTimeout?: NodeJS.Timeout;

  /**
   * As long as there are mutations in flight we do not want to send queries because it causes race conditions,
   * preventing parallel queries and mutations simplifies the mental model.
   */
  readonly safeToSendQueriesToServer = new BehaviorSubject<boolean>(true);

  /**
   * The number of queries that are currently in-flight (about to be sent to the server or sent but did not get a
   * response yet). This is used by the data manager to prevent it from sending mutations while there are in-flight
   * queries.
   */
  private readonly inflightQueriesCount = new BehaviorSubject(0);

  constructor(
    private readonly rpcManager: RpcManager,
    private readonly destructManager: DestructManager,
  ) {
    this.destructManager.onPreDestruct(() => {
      this.preDestruct();
    });
  }

  async sendQuery(queryRequest: QueryRequest): Promise<QueryResultData> {
    const responseSubject = new Subject<QueryResultData>();
    const responsePromise = firstValueFrom(responseSubject);
    this.pendingQueryRequests.push({ queryRequest, responseSubject });
    if (this.pendingQueryBatchTimeout) {
      clearTimeout(this.pendingQueryBatchTimeout);
      this.pendingQueryBatchTimeout = undefined;
    }
    // A batch should not have more than 10 queries
    if (this.pendingQueryRequests.length >= 10) {
      void this.processQueryBatch();
      return responsePromise;
    }
    this.pendingQueryBatchTimeout = setTimeout(() => {
      this.safeToSendQueriesToServer.pipe(filter(Boolean), take(1)).subscribe(() => {
        this.processQueryBatch();
      });
    }, 0);

    return responsePromise;
  }

  private async processQueryBatch(): Promise<void> {
    const queryRequestAndSubjects = this.pendingQueryRequests.splice(0);
    /**
     * There is a race condition where the batch timeout is triggered but the safeToSendQueriesToServer is not yet
     * true. In this case this function will be invoked again once the safeToSendQueriesToServer is true within the
     * setTimeout handler, and it may cause the queryRequestAndSubjects to be empty. In this case we do not want to send
     * an empty batch.
     */
    if (!queryRequestAndSubjects.length) return;
    const pendingQueryRequests = queryRequestAndSubjects.map(({ queryRequest }) => queryRequest);
    const responseSubjects = queryRequestAndSubjects.map(({ responseSubject }) => responseSubject);
    this.inflightQueriesCount.next(this.inflightQueriesCount.value + pendingQueryRequests.length);
    try {
      const batchResponse = await this.rpcManager.post<BatchQueryResultData>(
        'query/batchQueries',
        pendingQueryRequests,
      );

      for (const { queryRequest, responseSubject } of queryRequestAndSubjects) {
        const clientRequestId = queryRequest.clientRequestId;
        const error = batchResponse.errors[clientRequestId];
        const result = batchResponse.results[clientRequestId];
        if (error) {
          responseSubject.error(error);
        } else {
          responseSubject.next(result);
        }
      }
    } catch (e) {
      responseSubjects.forEach(responseSubject => responseSubject.error(e));
    } finally {
      this.inflightQueriesCount.next(this.inflightQueriesCount.value - pendingQueryRequests.length);
    }
  }

  /** Will resolve once all the in-flight queries are done. */
  async waitForAllQueriesToFinish(): Promise<void> {
    return firstValueFrom(this.inflightQueriesCount.pipe(filter(count => count === 0))).then(() => {
      return undefined;
    });
  }

  private preDestruct(): void {
    this.safeToSendQueriesToServer.next(false);
    this.safeToSendQueriesToServer.complete();
  }
}
