import { ClientRequestId, DocTimestamp, IntegrationId, Mutation, SquidDocId, SquidDocument } from './public-types';
import { PromisePool } from '@supercharge/promise-pool';
import { assertTruthy, truthy } from 'assertic';
import { BehaviorSubject, filter, firstValueFrom, Subject, take } from 'rxjs';
import { map } from 'rxjs/operators';
import { DestructManager } from './destruct.manager';
import DocumentIdentityService from './document-identity.service';
import { DocumentStore } from './document-store';
import { MutationSender } from './mutation/mutation-sender';
import { QueryBuilderFactory } from './query/query-builder.factory';
import { QuerySender } from './query/query-sender';
import { QuerySubscriptionManager } from './query/query-subscription.manager';
import { SocketManager } from './socket.manager';
import { TransactionId } from './types';
import { LockManager } from '../../internal-common/src/utils/lock.manager';
import { generateId } from '../../internal-common/src/public-utils/id-utils';
import { normalizeJsonAsString } from '../../internal-common/src/utils/serialization';
import { replaceKeyInMap } from '../../internal-common/src/utils/object';
import { getSquidDocId, parseSquidDocId, SquidPlaceholderId } from '../../internal-common/src/types/document.types';
import { applyUpdateMutation, IdResolutionMap, reduceMutations } from '../../internal-common/src/types/mutation.types';
import {
  MessageToClient,
  MutationResultData,
  MutationsMessageToClient,
  QueryResultData,
} from '../../internal-common/src/types/socket.types';

interface PromiseResolver<T> {
  resolve: (param?: T) => void;
  reject: (error: unknown) => void;
}

interface IncomingUpdateItem {
  properties: SquidDocument | undefined;
  timestamp: DocTimestamp;
}

interface OutgoingMutation {
  mutation: Mutation;
  sentToServer: boolean;
}

export interface DocTimestampMetadata {
  timestamp: DocTimestamp;
  expireTimestamp?: number;
}

/** Two transactions cannot run in parallel - this mutex is used for blocking a second transaction. */
const RUN_IN_TRANSACTION_MUTEX = 'dataManager_runInTransaction';

/** @internal */
export class DataManager {
  private readonly docIdToLocalTimestamp = new Map<SquidDocId, DocTimestamp | undefined>();

  private currentTransactionId: TransactionId | undefined;

  /**
   * During a batch, any update to a document that may trigger an update to a query is collected here and once the batch
   * ends, the relevant subscribes to these queries will be updated.
   */
  private readonly batchClientRequestIds = new Set<ClientRequestId>();

  /**
   * In this map we save timestamps for documents that are available locally or recently deleted but need to remain
   * here so if a mutation comes from the server we know whether to apply it or not based on the timestamp. For
   * example, if a document is removed locally and immediately after, an update is received from the server - In that
   * case, it may be the update pre-dates the data that was available locally a second ago. For that reason, the
   * timestamp needs to be kept and even for removed documents it will be kept for ~20 seconds more.
   *
   * Eventually, this map is used as a gatekeeper for preventing old versions of a document (based on timestamp) to
   * appear on the client.
   */
  private readonly docIdToServerTimestamp = new Map<SquidDocId, DocTimestampMetadata>();

  /**
   * In the case of a local change (outgoing change) without a server timestamp, an incoming server update cannot be
   * applied and needs to be queued until the local state allows it. In this case the incoming update will be queued in
   * this map. Any future incoming server update to the same document will override the previous update in case it has
   * a later timestamp.
   */
  private readonly pendingIncomingUpdates = new Map<SquidDocId, IncomingUpdateItem>();

  /**
   * A mutation sent to the server will be stored in this map until it receives a timestamp from the server. These
   * mutations were already applied locally and were sent to the server or about to be sent if sentToServer=false
   * (or are queued in the MutationSender due to a lock). The existence of these pending mutations indicates the data
   * manager to:
   * 1 - Not apply any incoming server mutation while there are outgoing mutations without a timestamp
   * 2 - Not delete the local document even if there are no queries related to this document - there is a chance
   *     that there will be a future query that will need this document and the query needs to return the local version
   *     of the document.
   * 3 - Even when all outgoing mutations have a timestamp, it may be that there is a pending query in flight that will
   *     need to return the document. In this case, the local document may be different from the server. Hence, it
   *     cannot accept the server change and will wait for another update from the server. For this purpose, timestamp
   *     will be stored in docsToTimestamp for ~20 more seconds.
   *
   * Note: Only one entry per squidDocId can be with sentToServer=false. This is true since all updates to the same doc
   *       in the same batch are appended (and reduced) to the same outgoing mutation object.
   */
  private readonly pendingOutgoingMutations = new Map<SquidDocId, Array<OutgoingMutation>>();
  private readonly pendingOutgoingMutationsChanged = new Subject<void>();
  /**
   * A subject for whether there are outgoing mutations. If there are outgoing mutations, any incoming update from the
   * server will be delayed until all the outgoing mutations will be acknowledged by the server.
   * This mechanism is needed to avoid this case:
   *
   * collection.docRef('a').delete();
   * collection.snapshots().subscribe((res) => {
   *   // The result here may include doc with id='a' since the delete mutation was not acknowledged.
   * });
   *
   * If we wait for the acknowledgment message, we will have a server timestamp and will be able to ignore the incoming
   * result for document with id='a'.
   */
  private readonly outgoingMutationsEmpty = new BehaviorSubject<boolean>(true);

  /**
   * When applying an outgoing mutation, there may be a short time that the mutation is not yet inserted into the
   * pending outgoing mutations map. In this case, we mark the document id as known to be dirty and remove it from the
   * set once the pending outgoing mutations map is updated.
   */
  private readonly knownDirtyDocs = new Set<SquidDocId>();

  private readonly failedDocsToResync: SquidDocId[] = [];
  private readonly refreshDocIdToTimestamp = new Map<SquidDocId, DocTimestamp>();

  private deleteExpiredTimestampsInterval: any;
  private handleIncomingMessagesForTests = true;

  constructor(
    private readonly documentStore: DocumentStore,
    private readonly mutationSender: MutationSender,
    private readonly socketManager: SocketManager,
    private readonly querySubscriptionManager: QuerySubscriptionManager,
    private readonly queryBuilderFactory: QueryBuilderFactory,
    private readonly lockManager: LockManager,
    private readonly destructManager: DestructManager,
    private readonly documentIdentityService: DocumentIdentityService,
    private readonly querySender: QuerySender,
  ) {
    this.destructManager.onDestruct(() => {
      this.destruct();
    });

    this.documentIdentityService.observeChanges().subscribe(this.migrateDocIds.bind(this));

    this.handleNotifications();
    this.startDeleteExpiredTimestampsJob();
    this.handleOrphanDocs();
    this.outgoingMutationsEmpty.subscribe(isEmpty => {
      /**
       * We  should not send queries to the server before we know that all outgoing updates were
       * applied on the server.
       */
      this.querySender.safeToSendQueriesToServer.next(isEmpty);
    });
  }

  getProperties(squidDocId: SquidDocId): SquidDocument | undefined {
    return this.documentStore.getDocumentOrUndefined(squidDocId);
  }

  /** Whether a document has changes that are out of sync with the server. */
  isDirty(squidDocId: SquidDocId): boolean {
    if (this.knownDirtyDocs.has(squidDocId)) return true;
    const hasOutgoingChanges = !!this.pendingOutgoingMutations.get(squidDocId)?.length;
    if (hasOutgoingChanges) return true;
    const docTimestamp = this.docIdToServerTimestamp.get(squidDocId);
    const serverTimestamp = docTimestamp && !docTimestamp.expireTimestamp ? docTimestamp.timestamp : undefined;
    const localDocTimestamp = this.docIdToLocalTimestamp.get(squidDocId);
    if (localDocTimestamp && !serverTimestamp) {
      return true;
    }
    if (this.isForgotten(squidDocId) || this.isLocalOnly(squidDocId)) {
      return true;
    }
    return localDocTimestamp !== serverTimestamp;
  }

  /**
   * Runs the provided function without sending mutations to the server while collecting the updates to the different
   * queries. Local updates will still be applied. Once the batch finishes, all the updates will be sent to the server
   * and the different queries will be notified. runInTransaction may be invoked inside another runInTransaction, only
   * the top level batch will trigger updates to the server.
   */
  async runInTransaction<T = any>(
    fn: (transactionId: TransactionId) => Promise<T>,
    transactionId?: TransactionId,
  ): Promise<T> {
    if (transactionId) {
      assertTruthy(transactionId === this.currentTransactionId, () => `Invalid transaction ID: ${transactionId}`);
      return fn(transactionId).then((r: T) => Promise.resolve(r));
    }
    if (this.lockManager.canGetLock(RUN_IN_TRANSACTION_MUTEX)) {
      this.lockManager.lockSync(RUN_IN_TRANSACTION_MUTEX);
    } else {
      await this.lockManager.lock(RUN_IN_TRANSACTION_MUTEX);
    }
    let error: unknown = UNSET_VALUE;
    const hasError = (): boolean => error !== UNSET_VALUE;
    return new Promise<T>(async (resolve, reject) => {
      try {
        this.currentTransactionId = generateId();
        let res: any = undefined;
        try {
          res = await fn(this.currentTransactionId);
        } catch (e1) {
          error = e1;
        } finally {
          this.finishTransaction(hasError() ? undefined : { resolve: (): void => resolve(res), reject });
        }
      } catch (e2) {
        error = hasError() ? error : e2;
      } finally {
        try {
          this.lockManager.release(RUN_IN_TRANSACTION_MUTEX);
        } catch (e3) {
          error = hasError() ? error : e3;
        }
      }
      if (hasError()) {
        reject(error);
      }
    });
  }

  /** Applies a mutation done from the client (from DocumentReference) and sends it to the server. */
  async applyOutgoingMutation(mutation: Mutation, transactionId: TransactionId | undefined): Promise<void> {
    const squidDocIdObj = mutation.squidDocIdObj;
    const squidDocId = getSquidDocId(squidDocIdObj);
    this.knownDirtyDocs.add(squidDocId);
    /**
     * When not in a transaction, we wait for the previous transaction to finish before processing the next mutation to
     * avoid mixing transactions.
     */
    if (!transactionId && !this.lockManager.canGetLock(RUN_IN_TRANSACTION_MUTEX)) {
      await this.lockManager.lock(RUN_IN_TRANSACTION_MUTEX);
      this.lockManager.release(RUN_IN_TRANSACTION_MUTEX);
    }
    this.knownDirtyDocs.delete(squidDocId);
    const lastOutgoingMutation = this.pendingOutgoingMutations.get(squidDocId)?.slice(-1)[0];
    if (lastOutgoingMutation && !lastOutgoingMutation.sentToServer) {
      lastOutgoingMutation.mutation = truthy(
        reduceMutations([lastOutgoingMutation.mutation, mutation])[0],
        'Failed to reduce mutations',
      );
      this.outgoingMutationsEmpty.next(false);
    } else {
      const outgoingMutation: OutgoingMutation = {
        mutation: this.removeInternalProperties(mutation),
        sentToServer: false,
      };
      const ar = this.pendingOutgoingMutations.get(squidDocId) || [];
      ar.push(outgoingMutation);
      this.pendingOutgoingMutations.set(squidDocId, ar);
      this.outgoingMutationsEmpty.next(false);
      this.pendingOutgoingMutationsChanged.next();
    }
    return this.runInTransaction(async () => {
      const docBefore = this.documentStore.getDocumentOrUndefined(squidDocId);
      const docAfter =
        mutation.type === 'delete'
          ? undefined
          : mutation.type === 'update'
            ? applyUpdateMutation(docBefore, mutation)
            : { ...mutation.properties };
      const updated = this.updateDocumentFromSnapshot(squidDocId, docAfter);
      if (!updated) return;
      if (mutation.type === 'insert') {
        this.docIdToLocalTimestamp.set(squidDocId, new Date().getTime());
      }
      const allClientRequestIds = this.querySubscriptionManager.setClientRequestIdsForLocalDoc(squidDocId, docAfter);
      allClientRequestIds.forEach(clientRequestId => this.batchClientRequestIds.add(clientRequestId));
    }, transactionId);
  }

  /** Same as runInTransaction with the exception that the passed function runs synchronously. */
  private async runInTransactionSync(
    fn: (transactionId: TransactionId) => void,
    transactionId?: TransactionId,
  ): Promise<void> {
    if (transactionId) {
      assertTruthy(transactionId === this.currentTransactionId, () => `Invalid transaction ID: ${transactionId}`);
      fn(transactionId);
      return;
    }

    await this.lockManager.lock(RUN_IN_TRANSACTION_MUTEX);
    try {
      this.currentTransactionId = generateId();
      try {
        return fn(this.currentTransactionId);
      } catch (e1) {
        console.error('error while executing callback function in transaction', e1);
      } finally {
        this.finishTransaction();
      }
    } catch (e2) {
      console.error('error while executing transaction', e2);
    } finally {
      this.lockManager.release(RUN_IN_TRANSACTION_MUTEX);
    }
  }

  /** Remove properties from the document record that should not be sent to the server. */
  private removeInternalProperties(mutation: Mutation): Mutation {
    if (mutation.type === 'delete') return mutation;
    const result = { ...mutation, properties: { ...mutation.properties } } as any;
    delete result.properties['__docId__'];
    delete result.properties['__ts__'];
    return result;
  }

  /** Listens and handles mutations and snapshots notifications from the socketManager. */
  private handleNotifications(): void {
    this.socketManager
      .observeNotifications()
      .pipe(
        filter((notification: MessageToClient) => notification.type === 'mutations'),
        map(n => n as MutationsMessageToClient),
      )
      .subscribe(notification => {
        this.outgoingMutationsEmpty.pipe(filter(Boolean), take(1)).subscribe(() => {
          this.handleIncomingMutations(notification.payload);
        });
      });

    this.querySubscriptionManager.observeQueryResults().subscribe(queryResult => {
      this.outgoingMutationsEmpty.pipe(filter(Boolean), take(1)).subscribe(() => {
        this.handleIncomingQuerySnapshots(queryResult);
      });
    });
  }

  private handleIncomingMutations(payload: Array<MutationResultData>): void {
    if (!this.handleIncomingMessagesForTests) {
      return;
    }
    const squidDocIdToNewData = payload.reduce(
      (prev, item) => {
        if (!this.querySubscriptionManager.hasOngoingQuery(item.clientRequestId)) {
          return prev;
        }
        prev[item.squidDocId] = {
          properties: item.doc,
          timestamp: item.mutationTimestamp,
        };
        return prev;
      },
      {} as Record<SquidDocId, IncomingUpdateItem>,
    );
    this.applyIncomingUpdates(squidDocIdToNewData);
  }

  private handleIncomingQuerySnapshots(queryResult: QueryResultData): void {
    if (!this.handleIncomingMessagesForTests) {
      return;
    }
    if (!this.querySubscriptionManager.hasOngoingQuery(queryResult.clientRequestId)) {
      return;
    }
    const query = this.querySubscriptionManager.getQuery(queryResult.clientRequestId);
    const squidDocIdToNewData = {} as Record<SquidDocId, IncomingUpdateItem>;
    for (const doc of queryResult.docs) {
      const squidDocId = getSquidDocId(doc.__docId__, query.collectionName, query.integrationId);
      squidDocIdToNewData[squidDocId] = { properties: doc, timestamp: doc.__ts__ };
    }
    this.runInTransactionSync(transactionId => {
      this.querySubscriptionManager.setGotInitialResult(queryResult.clientRequestId);
      this.batchClientRequestIds.add(queryResult.clientRequestId);
      const someDocumentsWereOutdated = this.applyIncomingUpdates(squidDocIdToNewData, transactionId);
      /**
       * If some documents were outdated, we should wait for the actual updates to arrive before we trigger the
       * queries. Unless, the query has no subscription - in this case a single snapshot was requested.
       */
      if (someDocumentsWereOutdated && this.querySubscriptionManager.hasSubscription(queryResult.clientRequestId)) {
        this.batchClientRequestIds.delete(queryResult.clientRequestId);
      }
    }).then();
  }

  /**
   * Returns a boolean for whether some updates were ignored because the client knows of a later timestamp for those
   * documents.
   */
  private applyIncomingUpdates(
    squidDocIdToNewData: Record<SquidDocId, IncomingUpdateItem>,
    transactionId?: TransactionId,
  ): boolean {
    /**
     * Whether there were some documents that were ignored because the client knows of a later timestamp and there is a
     * pending update that will trigger their change. In this case, we will not notify the query now and the pending
     * update will trigger it. Note: In case that the client knows of a later timestamp without an existing pending
     * change, it will not apply it in the document store but the query will still be notified.
     */
    let someDocumentsWereOutdated = false;
    const updatedDocIds = new Set<SquidDocId>();
    const staleDocIds = new Set<SquidDocId>();
    for (const [squidDocId, incomingUpdateItem] of Object.entries(squidDocIdToNewData)) {
      const existingPendingIncomingUpdate = this.pendingIncomingUpdates.get(squidDocId);
      const timestampMetadata = this.docIdToServerTimestamp.get(squidDocId);
      if (existingPendingIncomingUpdate && existingPendingIncomingUpdate.timestamp > incomingUpdateItem.timestamp) {
        /** There is a pending update with later timestamp for the document - skip notifying this query. */
        someDocumentsWereOutdated = true;
        continue;
      }

      if (!(timestampMetadata && timestampMetadata.timestamp > incomingUpdateItem.timestamp)) {
        /** If the client does not know of a later timestamp for this document, apply the update. */
        this.pendingIncomingUpdates.set(squidDocId, incomingUpdateItem);
        updatedDocIds.add(squidDocId);
      } else {
        /**
         * If the client knows of a later timestamp for this document, consider the incoming update as stale.
         * Since the ClientRequestId -> SquidDocId mapping may have been cleared before the updated, we still
         * need to refresh the map of ongoing queries to local documents.
         */
        staleDocIds.add(squidDocId);
      }
    }

    this.runInTransactionSync(() => {
      for (const squidDocId of updatedDocIds) {
        this.maybeApplyIncomingUpdate(squidDocId);
      }
      for (const squidDocId of staleDocIds) {
        this.refreshQueryMapping(squidDocId);
      }
    }, transactionId).then();
    return someDocumentsWereOutdated;
  }

  private maybeApplyIncomingUpdate(squidDocId: SquidDocId): void {
    const incomingUpdate = this.pendingIncomingUpdates.get(squidDocId);
    if (!incomingUpdate) return;
    const outgoingMutations = this.pendingOutgoingMutations.get(squidDocId);
    if (outgoingMutations && outgoingMutations.length) {
      return;
    }
    this.updateDocumentFromSnapshot(squidDocId, incomingUpdate.properties);
    this.acknowledgeDocument(squidDocId, incomingUpdate.timestamp, !incomingUpdate.properties);
    this.docIdToLocalTimestamp.set(squidDocId, incomingUpdate.timestamp);
    /** The incomingUpdate was applied locally - remove it from pendingIncomingUpdates. */
    this.pendingIncomingUpdates.delete(squidDocId);

    this.refreshQueryMapping(squidDocId);
  }

  private refreshQueryMapping(squidDocId: SquidDocId): void {
    const doc = this.documentStore.getDocumentOrUndefined(squidDocId);
    const allClientRequestIds = this.querySubscriptionManager.setClientRequestIdsForLocalDoc(squidDocId, doc);

    allClientRequestIds.forEach(clientRequestId => {
      this.batchClientRequestIds.add(clientRequestId);
    });

    /**
     * If the document was not deleted but there are no longer ongoing queries that will keep this document up-to-date,
     * we consider the document "forgotten" and mark the docIdToServerTimestamp with expireTimestamp.
     */
    if (doc) {
      if (!this.querySubscriptionManager.findQueriesForDocument(doc, squidDocId).length) {
        this.forgetDocument(squidDocId);
      }
    }
  }

  private destruct(): void {
    this.stopDeleteExpiredTimestampsJob();
  }

  private stopDeleteExpiredTimestampsJob(): void {
    if (this.deleteExpiredTimestampsInterval === undefined) return;
    clearInterval(this.deleteExpiredTimestampsInterval);
    this.deleteExpiredTimestampsInterval = undefined;
  }

  /**
   * Removes entries from the docToTimestamp map for all the documents that are no longer relevant for this client.
   * If a document is currently tracked, we forget it.
   * Cases a document is considered not relevant anymore:
   * 1 - There are no outgoing or incoming updates for this document AND:
   *   a - The document was deleted on the server and this client already received a notification about it OR
   *   b - The document no longer has a query that will keep it up-to-date
   */
  private startDeleteExpiredTimestampsJob(): void {
    this.deleteExpiredTimestampsInterval = setInterval(() => {
      const entriesToRemove = [...this.docIdToServerTimestamp.entries()].filter(([squidDocId, timestampMetadata]) => {
        if (!timestampMetadata.expireTimestamp || timestampMetadata.expireTimestamp > Date.now()) {
          return false;
        }
        return !this.isTracked(squidDocId);
      });
      for (const [squidDocId] of entriesToRemove) {
        this.docIdToServerTimestamp.delete(squidDocId);
        this.forgetDocument(squidDocId);
      }
    }, 10000);
  }

  /**
   * Whether the document is tracked by any pending mutations or ongoing queries.
   */
  isTracked(squidDocId: string): boolean {
    const hasPendingIncomingUpdate = !!this.pendingIncomingUpdates.get(squidDocId);
    if (hasPendingIncomingUpdate) {
      return true;
    }
    const outgoingUpdates = this.pendingOutgoingMutations.get(squidDocId);
    const hasPendingOutgoing = outgoingUpdates && outgoingUpdates.length;
    if (hasPendingOutgoing) {
      return true;
    }
    return this.querySubscriptionManager.hasOngoingQueryForDocId(squidDocId);
  }

  /**
   * Whether a document exists locally, but is no longer tracked by any mutations or queries. This is often the case
   * for in-memory DocumentReferences that are not part of any query.
   */
  isForgotten(squidDocId: SquidDocId): boolean {
    return this.documentStore.hasData(squidDocId) && !this.isTracked(squidDocId);
  }

  /**
   * Whether a document only exists locally. This means that the document has never by sent to or received from the
   * server.
   */
  isLocalOnly(squidDocId: SquidDocId): boolean {
    return !this.hasBeenAcknowledged(squidDocId) && this.documentStore.hasData(squidDocId);
  }

  /**
   * Whether the document has even been acknowledged by the server. Acknowledgements occur when an incoming query or
   * mutation is received, and when an outgoing mutation is acknowledged.
   */
  hasBeenAcknowledged(squidDocId: SquidDocId): boolean {
    return this.docIdToServerTimestamp.has(squidDocId);
  }

  /**
   * Updates the document with the new properties, returns true if an update was done or false in case the doc did not
   * change.
   */
  private updateDocumentFromSnapshot(squidDocId: SquidDocId, doc: SquidDocument | undefined): boolean {
    const existingDoc = this.documentStore.getDocumentOrUndefined(squidDocId);
    if ((!existingDoc && !doc) || existingDoc === doc) return false;
    if (existingDoc && doc) {
      const serializedDoc = normalizeJsonAsString({ ...doc, __ts__: undefined });
      const serializedExistingDoc = normalizeJsonAsString(existingDoc);
      if (serializedDoc === serializedExistingDoc) return false;
    }
    this.documentStore.saveDocument(squidDocId, doc);
    return true;
  }

  private finishTransaction(promiseResolver?: PromiseResolver<unknown>): void {
    this.currentTransactionId = undefined;
    /* Notify all local changes */
    const batchClientRequestIds = [...this.batchClientRequestIds.values()];
    this.batchClientRequestIds.clear();
    this.querySubscriptionManager.notifyAllSubscriptions(batchClientRequestIds);
    this.sendAllUnsentOutgoingMutations(promiseResolver).then();
  }

  private async sendAllUnsentOutgoingMutations(promiseResolver?: PromiseResolver<unknown>): Promise<void> {
    const outgoingMutationsByIntegrationId = this.groupOutgoingMutationsByIntegrationId();
    try {
      await PromisePool.for(outgoingMutationsByIntegrationId)
        .withConcurrency(outgoingMutationsByIntegrationId.length || 1)
        .handleError(e => {
          throw e;
        })
        .process(async ([integrationId, outgoingMutations]) => {
          await this.sendMutationsForIntegration([...outgoingMutations], integrationId);
        });

      if (!this.pendingOutgoingMutations.size) {
        this.outgoingMutationsEmpty.next(true);
      }

      await this.refreshUpdatedDocuments();
      const hasPendingSentMutations = this.hasPendingSentMutations();
      if (!hasPendingSentMutations) {
        promiseResolver?.resolve();
      } else {
        await firstValueFrom(this.pendingOutgoingMutationsChanged.pipe(filter(() => !this.hasPendingSentMutations())));
        promiseResolver?.resolve();
      }
    } catch (e) {
      if (!this.pendingOutgoingMutations.size) {
        this.outgoingMutationsEmpty.next(true);
        await this.resyncFailedUpdates();
      }
      promiseResolver?.reject(e);
    }
  }

  private async sendMutationsForIntegration(
    outgoingMutations: OutgoingMutation[],
    integrationId: string,
  ): Promise<void> {
    try {
      const {
        timestamp,
        idResolutionMap = {},
        refreshList = [],
      } = await this.mutationSender.sendMutations(
        outgoingMutations.map(outgoingMutation => outgoingMutation.mutation),
        integrationId,
      );

      this.documentIdentityService.migrate(idResolutionMap);
      refreshList.forEach(docId => {
        this.refreshDocIdToTimestamp.set(idResolutionMap[docId] || docId, timestamp);
      });
      for (const outgoingMutation of outgoingMutations) {
        let squidDocId = this.removeOutgoingMutation(outgoingMutation);
        if (idResolutionMap[squidDocId]) squidDocId = idResolutionMap[squidDocId];
        this.acknowledgeDocument(squidDocId, timestamp);
        if (!this.isTracked(squidDocId)) {
          /**
           * If the document is safe to delete, make sure the timestamp will persist for a bit more
           * since it may be that there are incoming updates that will not reflect the update that
           * was sent in the outgoingMutation.
           */
          this.setExpiration(squidDocId, true);
          this.forgetDocument(squidDocId);
        }
      }
    } catch (e) {
      for (const outgoingMutation of outgoingMutations) {
        const squidDocId = this.removeOutgoingMutation(outgoingMutation);
        this.forgetDocument(squidDocId);
        /** If the change is a local insert, and it failed, need to re-sync or remove locally */
        if (this.hasBeenAcknowledged(squidDocId) || outgoingMutation.mutation.type === 'insert') {
          this.failedDocsToResync.push(squidDocId);
        }
      }
      throw e;
    }
  }

  private removeOutgoingMutation(outgoingMutation: OutgoingMutation): string {
    const squidDocId = getSquidDocId(outgoingMutation.mutation.squidDocIdObj);
    const outgoingMutationsForDoc = truthy(this.pendingOutgoingMutations.get(squidDocId));
    outgoingMutationsForDoc.splice(outgoingMutationsForDoc.indexOf(outgoingMutation), 1);
    if (!outgoingMutationsForDoc.length) {
      this.pendingOutgoingMutations.delete(squidDocId);
    }
    this.pendingOutgoingMutationsChanged.next();
    return squidDocId;
  }

  private async resyncFailedUpdates(): Promise<void> {
    const failedDocsToResync = [...this.failedDocsToResync];
    this.failedDocsToResync.splice(0);
    for (const squidDocId of failedDocsToResync) {
      const { docId } = parseSquidDocId(squidDocId);
      this.setExpiration(squidDocId, true);
      try {
        const results = docId.includes(SquidPlaceholderId)
          ? []
          : await this.queryBuilderFactory.getForDocument(squidDocId).setForceFetchFromServer().snapshot();
        truthy(results.length <= 1, 'Got more than one doc for the same id:' + squidDocId);
        /** The document does not exist anymore, so we can forget about it */
        if (!results.length) {
          this.forgetDocument(squidDocId);
          const queriesToNotify = this.querySubscriptionManager.setClientRequestIdsForLocalDoc(squidDocId, undefined);
          this.querySubscriptionManager.notifyAllSubscriptions(queriesToNotify);
        }
      } catch (e) {
        // Update failed, re-sync failed; need to fail all the queries that touch this doc
        this.querySubscriptionManager.errorOutAllQueries(squidDocId, e);
      }
    }
  }

  private async refreshUpdatedDocuments(): Promise<void> {
    const docsToRefresh: Array<SquidDocId> = [];
    for (const [docId, timestamp] of this.refreshDocIdToTimestamp.entries()) {
      const serverTimestamp = this.docIdToServerTimestamp.get(docId)?.timestamp;
      // If an update from the server arrived after we scheduled the refresh,
      // the refresh should be ignored.
      if (serverTimestamp && serverTimestamp > timestamp) continue;
      docsToRefresh.push(docId);
    }
    this.refreshDocIdToTimestamp.clear();
    await Promise.allSettled(
      docsToRefresh.map(squidDocId => this.queryBuilderFactory.getForDocument(squidDocId).snapshot()),
    );
  }

  private groupOutgoingMutationsByIntegrationId(): Array<[IntegrationId, OutgoingMutation[]]> {
    const outgoingMutationsByIntegrationId: Record<IntegrationId, OutgoingMutation[]> = {};
    for (const [, outgoingMutations] of [...this.pendingOutgoingMutations.entries()]) {
      const latestOutgoingMutation: OutgoingMutation = outgoingMutations[outgoingMutations.length - 1];
      if (latestOutgoingMutation && !latestOutgoingMutation.sentToServer) {
        const integrationId = latestOutgoingMutation.mutation.squidDocIdObj.integrationId;
        (outgoingMutationsByIntegrationId[integrationId] ||= []).push(latestOutgoingMutation);
        latestOutgoingMutation.sentToServer = true;
      }
    }
    return Object.entries(outgoingMutationsByIntegrationId);
  }

  /**
   * Handles the case that due to some change (an incoming or outgoing change to a document), a document becomes orphan.
   * That is, there are no ongoing queries that will keep it up-to-date.
   * An orphan document should not stay locally since it may be stale after some time.
   */
  private handleOrphanDocs(): void {
    this.querySubscriptionManager.onOrphanDocuments.subscribe(orphanDocs => {
      for (const squidDocId of orphanDocs) {
        if (!this.isTracked(squidDocId)) {
          this.forgetDocument(squidDocId);
        }
      }
    });
  }

  private acknowledgeDocument(squidDocId: SquidDocId, timestamp: number, expires = false): void {
    this.docIdToServerTimestamp.set(squidDocId, { timestamp });
    this.setExpiration(squidDocId, expires);
  }

  private setExpiration(squidDocId: SquidDocId, expires: boolean): void {
    const docTimestamp = this.docIdToServerTimestamp.get(squidDocId);
    if (docTimestamp) {
      docTimestamp.expireTimestamp = expires ? Date.now() + 20000 : undefined;
    }
  }

  private forgetDocument(squidDocId: string): void {
    this.docIdToLocalTimestamp.delete(squidDocId);

    /**
     * Keep the document timestamp for a bit more just to make sure there is nothing in-flight that will be accepted
     * with the wrong timestamp.
     */
    this.setExpiration(squidDocId, true);
  }

  private migrateDocIds(idResolutionMap: IdResolutionMap): void {
    this.pendingOutgoingMutations.forEach(outgoingMutations => {
      outgoingMutations.forEach(outgoingMutation => {
        const squidDocId = getSquidDocId(outgoingMutation.mutation.squidDocIdObj);
        const resolvedId = idResolutionMap[squidDocId];
        if (resolvedId) {
          outgoingMutation.mutation.squidDocIdObj = parseSquidDocId(resolvedId);
        }
      });
    });

    Object.entries(idResolutionMap).forEach(([squidDocId, newSquidDocId]) => {
      replaceKeyInMap(this.pendingOutgoingMutations, squidDocId, newSquidDocId);
      replaceKeyInMap(this.docIdToLocalTimestamp, squidDocId, newSquidDocId);
      replaceKeyInMap(this.docIdToServerTimestamp, squidDocId, newSquidDocId);
    });
  }

  private hasPendingSentMutations(): boolean {
    for (const outgoingMutationsForDoc of this.pendingOutgoingMutations.values()) {
      for (const outgoingMutation of outgoingMutationsForDoc) {
        if (outgoingMutation.sentToServer) {
          return true;
        }
      }
    }
    return false;
  }
}

/** A constant used to differentiate undefined values provided by user from the real undefined variable state. */
const UNSET_VALUE = Symbol('undefined');
