import { truthy } from 'assertic';
import {
  BehaviorSubject,
  combineLatest,
  defer,
  delay,
  finalize,
  firstValueFrom,
  Observable,
  pairwise,
  race,
  ReplaySubject,
  share,
  startWith,
  Subject,
  switchMap,
  take,
  takeUntil,
  takeWhile,
  tap,
} from 'rxjs';
import { RpcManager } from '../rpc.manager';
import { filter, map } from 'rxjs/operators';
import { TrieStore as StateService } from 'otrie';
import { QuerySender } from './query-sender';
import { DocsAndAlias, LimitUnderflowState, OngoingQuery } from './query.types';
import {
  Alias,
  ClientRequestId,
  CollectionName,
  CompositeCondition,
  Condition,
  IntegrationId,
  isSimpleCondition,
  JoinCondition,
  Query,
  SquidDocId,
  SquidDocument,
} from '../public-types';
import { ClientIdService } from '../client-id.service';
import { DocumentStore } from '../document-store';
import { DestructManager } from '../destruct.manager';
import DocumentIdentityService from '../document-identity.service';
import { DebugLogger } from '../../../internal-common/src/utils/global.utils';
import { cloneDeep, getInPath, isEqual, replaceKeyInMap } from '../../../internal-common/src/utils/object';
import { deserializeObj, encodeValueForMapping } from '../../../internal-common/src/utils/serialization';
import { insertSorted, removeSorted } from '../../../internal-common/src/utils/array';
import { getSquidDocId, parseSquidDocId } from '../../../internal-common/src/types/document.types';
import { QueryResultData } from '../../../internal-common/src/types/socket.types';
import {
  compareOperator,
  encodeCondition,
  getQuerySubscriptionId,
  parseQuerySubscriptionId,
  QueryMapping,
  QueryRegisterRequest,
  QueryRequest,
  QuerySubscriptionId,
  QueryUnsubscribeRequest,
} from '../../../internal-common/src/types/query.types';
import { IdResolutionMap } from '../../../internal-common/src/types/mutation.types';
import { QueryContext } from '../../../internal-common/src/public-types-backend/query.public-context';

// See limitUnderflowState below.
// Exported only for tests
export const FETCH_BEYOND_LIMIT = 100;
export const LIMIT_UNDERFLOW_TRIGGER = 20;

/** @internal */
export class QuerySubscriptionManager {
  /**
   * An observable used by the data manager, the query subscription manager (this class) identifies when a document no
   * longer has queries that it is part of their result, such document is considered orphan. The data manager will mark
   * as orphan.
   */
  onOrphanDocuments = new Subject<Array<SquidDocId>>();

  /** All the currently running queries with their full state. */
  private readonly ongoingQueries = new Map<ClientRequestId, OngoingQuery>();

  /**
   * The two maps below maintain the relation between document ids we know about locally to clientRequestIds (queries).
   * This relation is used for determining whether a document can be safely removed.
   */
  private readonly clientRequestIdToLocalDocuments = new Map<ClientRequestId, Set<SquidDocId>>();
  private readonly localDocumentToClientRequestIds = new Map<SquidDocId, Set<ClientRequestId>>();

  /**
   * A data structure for mapping queries to allow reverse queries search (given a document, find all the matching
   * queries)
   */
  private readonly queryMappingManager = new ClientQueryMappingManager();

  private readonly queryResultsSubject = new Subject<QueryResultData>();

  constructor(
    private readonly rpcManager: RpcManager,
    private readonly clientIdService: ClientIdService,
    private readonly documentStore: DocumentStore,
    private readonly destructManager: DestructManager,
    private readonly documentIdentityService: DocumentIdentityService,
    private readonly querySender: QuerySender,
  ) {
    /**
     * In the case that a document ID changed on the server, this observable will notify us about the change, and we
     * will update the mapping.
     */
    this.documentIdentityService.observeChanges().subscribe(this.migrateDocIds.bind(this));

    /** In the case that the client ID changed, we should refresh all ongoing queries. */
    this.clientIdService.observeClientReadyToBeRegenerated().subscribe(() => {
      this.refreshOngoingQueries();
    });
    this.destructManager.onPreDestruct(() => {
      this.preDestruct();
    });
  }

  // noinspection JSUnusedGlobalSymbols
  dumpSubscriptionInfo(): void {
    console.log('Ongoing queries:', this.ongoingQueries);
    console.log('ClientRequestId to local documents:', this.clientRequestIdToLocalDocuments);
    console.log('Local documents to clientRequestId:', this.localDocumentToClientRequestIds);
  }

  observeQueryResults(): Observable<QueryResultData> {
    return this.queryResultsSubject.asObservable();
  }

  /**
   * Returns true if the client knows about this clientRequestId. It may happen that it will return false in the case
   * that the client unsubscribed from a query but the server sent a mutation update for this clientRequestId.
   */
  hasOngoingQuery(clientRequestId: ClientRequestId): boolean {
    return this.ongoingQueries.has(clientRequestId);
  }

  /**
   * Returns the query associated with the given clientRequestId. Throws error if the clientRequestId is not known.
   */
  getQuery(clientRequestId: ClientRequestId): Query {
    return truthy(this.ongoingQueries.get(clientRequestId), 'UNKNOWN_QUERY').query;
  }

  /**
   * A query receives updates from three different sources:
   * 1 - An initial snapshot from the server or from a parent query
   * 2 - Incremental updates from the server (or from a parent query before the query is registered)
   * 3 - A new snapshot if the query is refreshed after connection has been lost.
   *
   * If an incremental update is received before the snapshot was received, we cannot process it for this query.
   * This boolean indicates whether the initial snapshot was received.
   */
  setGotInitialResult(clientRequestId: ClientRequestId): void {
    const ongoingQuery = this.ongoingQueries.get(clientRequestId);
    /**
     * If this query already got initial result from the server, it means that the query is able to accept incremental
     * updates. In this case we should clear the mapping we have for this clientRequestId since a new updated mapping
     * will be created when applying the update.
     */
    if (ongoingQuery?.gotInitialResponse) {
      this.removeClientRequestIdMapping(clientRequestId);
    }

    if (ongoingQuery) {
      ongoingQuery.gotInitialResponse = true;
      ongoingQuery.isInFlight = false;
    }
  }

  /** Given a document, returns all the queries that should be notified with the new document properties. */
  findQueriesForDocument(doc: SquidDocument, squidDocId: SquidDocId): Array<QuerySubscriptionId> {
    const { collectionName, integrationId } = parseSquidDocId(squidDocId);
    const mapping: QueryMapping<QuerySubscriptionId> | undefined = this.queryMappingManager.getMapping(
      collectionName,
      integrationId,
    );
    if (!mapping) return [];
    return findQueriesForDocumentSync(mapping, doc);
  }

  /**
   * Given the new document's properties, finds all the queries that should be notified with the new properties and
   * updates the internal mappings (squidDocId --> client request Ids and, clientRequestId --> squidDocIds).
   * Returns an array with all the previous and current client request ids (basically all the client request ids that
   * will need to be notified due to the change of properties).
   */
  setClientRequestIdsForLocalDoc(
    squidDocId: SquidDocId,
    properties: SquidDocument | undefined,
  ): Array<ClientRequestId> {
    const clientRequestIdsBefore = this.localDocumentToClientRequestIds.get(squidDocId) || new Set();
    const clientRequestIdsAfter = new Set(
      properties
        ? this.findQueriesForDocument(properties, squidDocId).map(
            querySubscriptionId => parseQuerySubscriptionId(querySubscriptionId).clientRequestId,
          )
        : [],
    );
    const allClientRequestIds = new Set([...clientRequestIdsBefore, ...clientRequestIdsAfter]);
    for (const clientRequestIdBefore of [...clientRequestIdsBefore]) {
      if (clientRequestIdsAfter.has(clientRequestIdBefore)) continue;
      clientRequestIdsBefore.delete(clientRequestIdBefore);
      const localDocsBefore = this.clientRequestIdToLocalDocuments.get(clientRequestIdBefore);
      if (localDocsBefore) {
        localDocsBefore.delete(squidDocId);
        if (!localDocsBefore.size) {
          this.clientRequestIdToLocalDocuments.delete(clientRequestIdBefore);
        }
      }
      if (!clientRequestIdsBefore.size) {
        this.localDocumentToClientRequestIds.delete(squidDocId);
      }
    }

    for (const clientRequestIdAfter of clientRequestIdsAfter) {
      let clientRequestIds = this.localDocumentToClientRequestIds.get(squidDocId);
      if (!clientRequestIds) {
        clientRequestIds = new Set<ClientRequestId>();
        this.localDocumentToClientRequestIds.set(squidDocId, clientRequestIds);
      }
      clientRequestIds.add(clientRequestIdAfter);

      let localDocuments = this.clientRequestIdToLocalDocuments.get(clientRequestIdAfter);
      if (!localDocuments) {
        localDocuments = new Set<SquidDocId>();
        this.clientRequestIdToLocalDocuments.set(clientRequestIdAfter, localDocuments);
      }
      localDocuments.add(squidDocId);
    }
    return [...allClientRequestIds];
  }

  /**
   * Due to an error when syncing a document, all the queries that are subscribed to this document should be notified
   * and error out.
   */
  errorOutAllQueries(squidDocId: SquidDocId, err: unknown): void {
    const clientRequestIds = this.localDocumentToClientRequestIds.get(squidDocId) || new Set();
    for (const clientRequestId of clientRequestIds) {
      const ongoingQuery = this.ongoingQueries.get(clientRequestId);
      if (!ongoingQuery) continue;
      if (!this.destructManager.isDestructing) {
        ongoingQuery.dataSubject.error(err);
      } else {
        ongoingQuery.dataSubject.complete();
      }
      ongoingQuery.done = true;
      // TODO handle joins
    }
  }

  /** Notifies to all the given queries (identified by their clientRequestId) with the updated query result. */
  notifyAllSubscriptions(clientRequestIds: ClientRequestId[]): void {
    const rootOngoingQueries = new Set<OngoingQuery>();
    for (const clientRequestId of clientRequestIds) {
      const ongoingQuery = this.ongoingQueries.get(clientRequestId);
      if (!ongoingQuery) continue;
      if (!ongoingQuery.gotInitialResponse || !ongoingQuery.activated || ongoingQuery.isInFlight) continue;
      const docIdSet = this.clientRequestIdToLocalDocuments.get(clientRequestId) || new Set();
      const result = this.documentStore.sortAndLimitDocs(docIdSet, ongoingQuery.query);
      const observablesUpdated = ongoingQuery.supportedQueries
        .map(supportedOngoingQuery =>
          this.updateOngoingQueryWithNewDataFromSupportingQuery(result, supportedOngoingQuery),
        )
        .some(Boolean);

      let rootOngoingQuery = ongoingQuery;
      while (!rootOngoingQuery.allObservables) {
        rootOngoingQuery = truthy(rootOngoingQuery?.supportingOngoingQuery);
      }
      if (observablesUpdated) {
        rootOngoingQueries.add(rootOngoingQuery);
      }
      if (ongoingQuery.query.limit > 0) {
        switch (ongoingQuery.limitUnderflowState) {
          case LimitUnderflowState.UNKNOWN:
            ongoingQuery.limitUnderflowState =
              docIdSet.size === ongoingQuery.query.limit + FETCH_BEYOND_LIMIT
                ? LimitUnderflowState.ENABLED
                : LimitUnderflowState.DISABLED;
            break;

          case LimitUnderflowState.DISABLED:
            break;

          case LimitUnderflowState.ENABLED:
            if (docIdSet.size < ongoingQuery.query.limit + LIMIT_UNDERFLOW_TRIGGER) {
              ongoingQuery.limitUnderflowState = LimitUnderflowState.UNKNOWN;
              this.sendQueryToServerOrUseParentQuery(ongoingQuery);
              continue;
            }
        }
      }
      ongoingQuery.dataSubject.next(result);
    }

    for (const rootOngoingQuery of rootOngoingQueries) {
      const allObservables = this.collectAllObservables(rootOngoingQuery);
      truthy(rootOngoingQuery.allObservables).next(allObservables);
    }
  }

  private isValidParent(candidateParentQuery: OngoingQuery): boolean {
    if (
      !candidateParentQuery.activated ||
      candidateParentQuery.isInFlight ||
      candidateParentQuery.isEmptyForJoin ||
      candidateParentQuery.done ||
      !candidateParentQuery.subscribe ||
      !candidateParentQuery.gotInitialResponse ||
      !candidateParentQuery.dataSubject.value
    ) {
      return false;
    }

    const limit = candidateParentQuery.query.limit === -1 ? 1000 : candidateParentQuery.query.limit;

    /**
     * If the parent query has more or equal number results to the given limit - it may be that the sub-query
     * may need to see different result from what the parent sees.
     */
    return candidateParentQuery.dataSubject.value.length < limit;
  }

  /**
   * Given an ongoing query, search for candidate ongoing queries that can serve as a parent.
   * If there is a parent query, the result of that query can be used for serving the current query.
   * We will still register the current query on the server, but we do not need to run the query, apply security rules,
   * etc.
   */
  private findValidParentOfOngoingQuery(ongoingQuery: OngoingQuery): OngoingQuery | undefined {
    if (ongoingQuery.forceFetchFromServer) return undefined;
    const qc = new QueryContext(ongoingQuery.query);

    for (const candidateParentQuery of this.ongoingQueries.values()) {
      if (ongoingQuery === candidateParentQuery) {
        // If the query is currently ongoing, we should not assign it a parent.
        return undefined;
      } else if (!this.isValidParent(candidateParentQuery)) {
        continue;
      } else if (qc.isSubqueryOfQuery(candidateParentQuery.query)) {
        return candidateParentQuery;
      }
    }
    return undefined;
  }

  findValidParentOfQuery(query: Query): OngoingQuery | undefined {
    const qc = new QueryContext(query);
    for (const candidateParentQuery of this.ongoingQueries.values()) {
      if (!this.isValidParent(candidateParentQuery)) {
        continue;
      }
      if (qc.isSubqueryOfQuery(candidateParentQuery.query)) {
        return candidateParentQuery;
      }
    }
    return undefined;
  }

  processQuery(
    query: Query,
    rootAlias: Alias,
    joins: Record<string, Query>,
    // Map from the right alias to the join condition with the left alias
    joinConditions: Record<Alias, JoinCondition>,
    subscribe: boolean,
    forceFetchFromServer: boolean,
  ): Observable<Array<Record<Alias, SquidDocument | undefined>>> {
    // We defer this, because we want it to re-run the query if all previous subscribers unsubscribed, and there's a
    // new subscriber
    return defer(() => {
      const rootOngoingQuery = this.createOngoingQueryGraph(query, rootAlias, joins, joinConditions, subscribe, true);
      if (forceFetchFromServer) {
        rootOngoingQuery.forceFetchFromServer = true;
      }
      this.sendQueryToServerOrUseParentQuery(rootOngoingQuery);
      rootOngoingQuery.allObservables = new ReplaySubject<Array<Observable<DocsAndAlias>>>(1);
      const result = rootOngoingQuery.allObservables.pipe(
        switchMap(allObservables =>
          combineLatest(allObservables).pipe(
            map(allResults => this.joinResults(allResults, joinConditions, rootOngoingQuery)),
          ),
        ),
        filter(() => this.allOngoingQueriesGotInitialResult(rootOngoingQuery)),
        startWith(undefined),
        pairwise(),
        filter(([before, after]) => !isEqual(before, after)),
        map(([, after]) => after),
        // This handles 'subscribe = false'
        subscribe ? tap() : take(1),
        finalize(() => {
          rootOngoingQuery.dataSubject.complete();
          rootOngoingQuery.done = true;
          this.completeAllSupportedQueries(rootOngoingQuery).then();
          rootOngoingQuery.allObservables?.complete();
        }),
      );
      const allObservables: Array<Observable<DocsAndAlias>> = this.collectAllObservables(rootOngoingQuery);
      rootOngoingQuery.allObservables.next(allObservables);
      return result;
    }).pipe(share<any>());
  }

  /**
   * Returns whether the given document ID has a query that has this document ID as a result.
   * A document without a query is considered "un-tracked".
   */
  hasOngoingQueryForDocId(squidDocId: string): boolean {
    const clientRequestIds = this.localDocumentToClientRequestIds.get(squidDocId);
    return !!clientRequestIds && !!clientRequestIds.size;
  }

  /**
   * Removes a query from the mapping and updates the orphan documents as needed.
   */
  private removeClientRequestIdMapping(clientRequestId: string): void {
    const docs = this.clientRequestIdToLocalDocuments.get(clientRequestId);
    if (!docs) return;
    this.clientRequestIdToLocalDocuments.delete(clientRequestId);
    const orphanDocument: Array<SquidDocId> = [];
    for (const doc of docs) {
      const clientRequestIds = truthy(this.localDocumentToClientRequestIds.get(doc));
      clientRequestIds.delete(clientRequestId);
      if (!clientRequestIds.size) {
        this.localDocumentToClientRequestIds.delete(doc);
        orphanDocument.push(doc);
      }
    }
    if (orphanDocument.length) {
      this.onOrphanDocuments.next(orphanDocument);
    }
  }

  /** Register logic for cleaning up the query when it is unsubscribed. */
  private registerQueryFinalizer(ongoingQuery: OngoingQuery): void {
    const clientRequestId = ongoingQuery.clientRequestId;
    const querySubscriptionId = getQuerySubscriptionId(this.clientIdService.getClientId(), clientRequestId);
    ongoingQuery.dataSubject
      .pipe(
        finalize(async () => {
          if (ongoingQuery.unsubscribeBlockerCount.value > 0) {
            await firstValueFrom(
              race(
                this.destructManager.observeIsDestructing(),
                ongoingQuery.unsubscribeBlockerCount.pipe(filter(count => count === 0)),
              ),
            );
          }
          this.queryMappingManager.removeQuery(querySubscriptionId).then();
          this.ongoingQueries.delete(clientRequestId);
          if (ongoingQuery.subscribe) {
            if (!ongoingQuery.isEmptyForJoin && ongoingQuery.activated) {
              const unsubscribeRequest: QueryUnsubscribeRequest = {
                clientRequestId,
              };
              this.rpcManager.post('query/unsubscribe', unsubscribeRequest).catch(e => {
                if (this.destructManager.isDestructing) return;
                console.error('Got error while unsubscribing from query', ongoingQuery.query, e);
              });
            }
          }
          this.removeClientRequestIdMapping(clientRequestId);
          this.ongoingQueries.delete(clientRequestId);
        }),
        filter(Boolean),
      )
      .subscribe({
        error: () => {
          // Drop errors here
        },
      });
  }

  /** Creates a graph of ongoing queries and returns the root of the graph. */
  private createOngoingQueryGraph(
    query: Query,
    alias: Alias,
    joins: Record<Alias, Query>,
    joinConditionsMap: Record<Alias, JoinCondition>,
    subscribe: boolean,
    isRoot: boolean,
    createdOngoingQueries: Record<Alias, OngoingQuery> = {},
  ): OngoingQuery {
    if (createdOngoingQueries[alias]) return createdOngoingQueries[alias];
    const clientRequestId = this.clientIdService.generateClientRequestId();
    const supportedQueries: Array<OngoingQuery> = [];
    const result: OngoingQuery = {
      clientRequestId,
      activated: isRoot,
      alias,
      query,
      subscribe,
      dataSubject: new BehaviorSubject<Array<SquidDocument> | null>(null),
      supportedQueries,
      supportingOngoingQuery: undefined,
      joinCondition: undefined,
      gotInitialResponse: false,
      isEmptyForJoin: false,
      canExpandForJoin: true,
      unsubscribeBlockerCount: new BehaviorSubject<number>(0),
      queryRegistered: new BehaviorSubject<boolean>(false),
      done: false,
      isInFlight: false,
      forceFetchFromServer: false,
      limitUnderflowState: subscribe ? LimitUnderflowState.UNKNOWN : LimitUnderflowState.DISABLED,
    };
    this.registerQueryFinalizer(result);
    this.ongoingQueries.set(clientRequestId, result);
    createdOngoingQueries[alias] = result;
    for (const [rightAlias, joinCondition] of Object.entries(joinConditionsMap)) {
      const leftAlias = joinCondition.leftAlias;
      if (leftAlias !== alias && rightAlias !== alias) continue;
      const chosenAlias = leftAlias === alias ? rightAlias : leftAlias;
      if (leftAlias === alias) {
        const supportedQuery = this.createOngoingQueryGraph(
          joins[chosenAlias],
          chosenAlias,
          joins,
          joinConditionsMap,
          subscribe,
          false,
          createdOngoingQueries,
        );
        supportedQuery.joinCondition = joinCondition;
        supportedQueries.push(supportedQuery);
      } else {
        result.supportingOngoingQuery = this.createOngoingQueryGraph(
          joins[chosenAlias],
          chosenAlias,
          joins,
          joinConditionsMap,
          subscribe,
          false,
          createdOngoingQueries,
        );
      }
    }
    return result;
  }

  private collectAllObservables(
    ongoingQuery: OngoingQuery,
    result: Array<Observable<DocsAndAlias>> = [],
  ): Array<Observable<DocsAndAlias>> {
    if (ongoingQuery.isEmptyForJoin) {
      return result;
    }
    const alias = ongoingQuery.alias;
    result.push(
      ongoingQuery.dataSubject.pipe(
        filter(Boolean),
        map(docs => {
          return { docs, alias };
        }),
      ),
    );
    for (const supportedQuery of ongoingQuery.supportedQueries) {
      this.collectAllObservables(supportedQuery, result);
    }
    return result;
  }

  private joinResults(
    allResults: Array<DocsAndAlias>,
    joinConditions: Record<Alias, JoinCondition>,
    rootOngoingQuery: OngoingQuery,
  ): Array<Record<Alias, SquidDocument | undefined>> {
    const aliasToDocs = allResults.reduce(
      (accum, docsAndAlias) => {
        if (accum[docsAndAlias.alias]) {
          accum[docsAndAlias.alias].push(...docsAndAlias.docs);
        } else {
          accum[docsAndAlias.alias] = [...docsAndAlias.docs];
        }
        return accum;
      },
      {} as Record<Alias, Array<SquidDocument>>,
    );

    let result: Array<Record<Alias, SquidDocument | undefined>> = aliasToDocs[rootOngoingQuery.alias].map(doc => ({
      [rootOngoingQuery.alias]: doc,
    }));
    const ongoingQueriesInOrder = this.getOngoingQueriesBfs(rootOngoingQuery);
    const visitedAliases = new Set<Alias>();
    // Skipping the root since it is already part of the result.
    for (let i = 1; i < ongoingQueriesInOrder.length; i++) {
      const ongoingQuery = ongoingQueriesInOrder[i];
      const rightAlias = ongoingQuery.alias;
      if (visitedAliases.has(rightAlias)) continue;
      visitedAliases.add(rightAlias);
      result = this.join(result, rightAlias, aliasToDocs[rightAlias], joinConditions[rightAlias]);
    }
    return result;
  }

  private join(
    left: Array<Record<Alias, SquidDocument | undefined>>,
    rightAlias: Alias,
    rightDocs: Array<SquidDocument> | undefined,
    joinCondition: JoinCondition,
  ): Array<Record<Alias, SquidDocument | undefined>> {
    if (!left.length) return left;
    const availableLeftAliases = Object.keys(left[0]);
    if (!(joinCondition && availableLeftAliases.includes(joinCondition.leftAlias))) {
      throw new Error('No join condition found for alias ' + rightAlias);
    }
    const rightAsMap = new Map<any, Array<SquidDocument>>();

    (rightDocs || []).forEach(doc => {
      const val = this.transformKey(doc[joinCondition.right]);
      if (!rightAsMap.has(val)) rightAsMap.set(val, []);
      truthy(rightAsMap.get(val)).push(doc);
    });

    return left.flatMap(leftElement => {
      const rightDocsWithSameValue =
        rightAsMap.get(this.transformKey(leftElement[joinCondition.leftAlias]?.[joinCondition.left])) || [];
      if (rightDocsWithSameValue.length) {
        return rightDocsWithSameValue.map(rightDoc => ({ ...leftElement, [rightAlias]: rightDoc }));
      } else if (joinCondition.isInner) {
        return [];
      } else {
        return [{ ...leftElement, [rightAlias]: undefined }];
      }
    });
  }

  private getOngoingQueriesBfs(rootOngoingQuery: OngoingQuery): Array<OngoingQuery> {
    const result: Array<OngoingQuery> = [];
    const queue: Array<OngoingQuery> = [rootOngoingQuery];
    while (queue.length) {
      const current = truthy(queue.shift());
      if (current.isEmptyForJoin) continue;
      result.push(current);
      queue.push(...current.supportedQueries);
    }
    return result;
  }

  private updateOngoingQueryWithNewDataFromSupportingQuery(
    supportingQueryResult: Array<SquidDocument>,
    supportedOngoingQuery: OngoingQuery,
  ): boolean {
    const joinCondition = truthy(supportedOngoingQuery.joinCondition);
    const query = supportedOngoingQuery.query;
    if (!supportedOngoingQuery.activated) {
      supportedOngoingQuery.activated = true;
      const existingCondValues = query.conditions
        .filter(isSimpleCondition)
        .filter(cond => {
          return cond.fieldName === joinCondition.right && cond.operator === '==';
        })
        .map(cond => cond.value);

      if (existingCondValues.length) {
        this.sendQueryToServerOrUseParentQuery(supportedOngoingQuery);
        supportedOngoingQuery.canExpandForJoin = false;
        return true;
      }
      const newConditions = supportingQueryResult
        .map(supportingDoc => supportingDoc[joinCondition.left] ?? null)
        .map<Condition>(value => {
          return {
            fieldName: joinCondition.right,
            operator: '==',
            value,
          };
        });

      if (newConditions.length) {
        query.conditions.push(...newConditions);
        this.sendQueryToServerOrUseParentQuery(supportedOngoingQuery);
      } else {
        // The supported query should not be executed since there are no values to match
        supportedOngoingQuery.isEmptyForJoin = true;
      }
      return true;
    } else {
      if (!supportedOngoingQuery.canExpandForJoin) {
        return false;
      }
      const supportedQueriesWithSameAlias = truthy(
        supportedOngoingQuery.supportingOngoingQuery?.supportedQueries,
      ).filter(q => q.alias === supportedOngoingQuery.alias);
      const allNeededValues = new Set(supportingQueryResult.map(resultDoc => resultDoc[joinCondition.left] ?? null));
      for (const supportedQuery of supportedQueriesWithSameAlias) {
        supportedQuery.query.conditions
          .filter(isSimpleCondition)
          .filter(cond => cond.fieldName === joinCondition.right)
          .forEach(cond => {
            allNeededValues.delete(cond.value);
          });
      }
      if (allNeededValues.size === 0) {
        return false;
      }
      const newQuery = cloneDeep(query);
      newQuery.conditions = newQuery.conditions.filter(
        cond => !isSimpleCondition(cond) || cond.fieldName !== joinCondition.right,
      );
      [...allNeededValues].forEach(value => {
        newQuery.conditions.push({
          fieldName: joinCondition.right,
          operator: '==',
          value,
        });
      });
      const ongoingQuery: OngoingQuery = {
        ...supportedOngoingQuery,
        query: newQuery,
        activated: true,
        gotInitialResponse: false,
        dataSubject: new BehaviorSubject<Array<SquidDocument> | null>(null),
        clientRequestId: this.clientIdService.generateClientRequestId(),
        isEmptyForJoin: false,
      };
      this.registerQueryFinalizer(ongoingQuery);
      this.ongoingQueries.set(ongoingQuery.clientRequestId, ongoingQuery);
      truthy(supportedOngoingQuery.supportingOngoingQuery).supportedQueries.push(ongoingQuery);
      this.sendQueryToServerOrUseParentQuery(ongoingQuery);
      return true;
    }
  }

  private allOngoingQueriesGotInitialResult(rootOngoingQuery: OngoingQuery): boolean {
    if (rootOngoingQuery.isEmptyForJoin) return true;
    if (!rootOngoingQuery.gotInitialResponse) return false;
    if (!rootOngoingQuery.supportedQueries.length) return true;
    return rootOngoingQuery.supportedQueries.every(ongoingQuery =>
      this.allOngoingQueriesGotInitialResult(ongoingQuery),
    );
  }

  private async completeAllSupportedQueries(rootOngoingQuery: OngoingQuery): Promise<void> {
    const supportedQueries = [...(rootOngoingQuery.supportedQueries || [])];
    while (supportedQueries.length) {
      const supportedQuery = truthy(supportedQueries.shift());
      supportedQueries.push(...(supportedQuery.supportedQueries || []));
      await firstValueFrom(supportedQuery.unsubscribeBlockerCount.pipe(filter(count => count === 0)));
      supportedQuery.dataSubject.complete();
    }
  }

  // Transforms a key to be used inside a map when determining overlaps in a join. If the key is a Date, it returns a
  // string representation of the date. This is needed since objects (Date type is an object) used as keys in maps are
  // compared by reference. Therefore, using a new Date object in a map will not match to any value, even if a key
  // exists which has the equivalent value. Returns the input as is if it is not a date.
  private transformKey(key: any): any {
    if (key instanceof Date) {
      return `DATE AS string KEY: ${key.toISOString()}`;
    }
    return key;
  }

  private preDestruct(): void {
    this.unsubscribe();
  }

  unsubscribe(): void {
    const ongoingQueries = [...this.ongoingQueries.values()];
    for (const ongoingQuery of ongoingQueries) {
      ongoingQuery.dataSubject.complete();
      ongoingQuery.allObservables?.complete();
    }
  }

  hasSubscription(clientRequestId: ClientRequestId): boolean {
    return !!this.ongoingQueries.get(clientRequestId)?.subscribe;
  }

  /** Sends the query request to the server and makes sure to unsubscribe once the subject completes. */
  private sendQueryToServerOrUseParentQuery(ongoingQuery: OngoingQuery, forceSendToServer = false): void {
    if (this.destructManager.isDestructing) return;

    const query = ongoingQuery.query;
    const clientRequestId = ongoingQuery.clientRequestId;
    const querySubscriptionId = getQuerySubscriptionId(this.clientIdService.getClientId(), clientRequestId);
    this.queryMappingManager.addQuery(query, querySubscriptionId);

    this.ongoingQueries.set(clientRequestId, ongoingQuery);
    const parentOngoingQuery = forceSendToServer ? undefined : this.findValidParentOfOngoingQuery(ongoingQuery);
    if (parentOngoingQuery) {
      this.useParentOngoingQuery(ongoingQuery, parentOngoingQuery).then();
    } else {
      this.sendQueryToServer(ongoingQuery);
    }
  }

  /**
   * Uses the parent query as the source of data for the given ongoing query until the ongoing query is registered on
   * the server. It prevents the parent query from being unsubscribed until the query is registered on the server and
   * the first snapshot is received from the parent query.
   * 1 - Prevents the parent query from being unsubscribed
   * 2 - Connects the results of the parent query to the result of the current ongoing query
   * 3 - Registers the query on the server
   */
  private async useParentOngoingQuery(ongoingQuery: OngoingQuery, parentOngoingQuery: OngoingQuery): Promise<void> {
    const queryRegisterRequest: QueryRegisterRequest = {
      clientRequestId: ongoingQuery.clientRequestId,
      query: ongoingQuery.query,
      parentClientRequestId: parentOngoingQuery.clientRequestId,
    };

    const queryContext = new QueryContext(ongoingQuery.query);
    parentOngoingQuery.unsubscribeBlockerCount.next(parentOngoingQuery.unsubscribeBlockerCount.value + 1);

    // Wait for the parent query to be registered on the server
    try {
      await firstValueFrom(parentOngoingQuery.queryRegistered.pipe(filter(Boolean)));
    } catch (e) {
      if (!this.destructManager.isDestructing) {
        ongoingQuery.dataSubject.error(e);
        ongoingQuery.queryRegistered.error(e);
      } else {
        ongoingQuery.dataSubject.complete();
        ongoingQuery.queryRegistered.complete();
      }
      ongoingQuery.done = true;
      return;
    }
    if (this.destructManager.isDestructing) return;

    if (ongoingQuery.done) {
      return;
    }

    this.rpcManager
      .post<boolean>('query/register', queryRegisterRequest)
      .then(() => {
        ongoingQuery.isInFlight = false;
        ongoingQuery.queryRegistered.next(true);
      })
      .catch(e => {
        ongoingQuery.isInFlight = false;
        if (!this.destructManager.isDestructing) {
          console.error('Query error', ongoingQuery.query, parentOngoingQuery.query, e);
          ongoingQuery.dataSubject.error(e);
        } else {
          ongoingQuery.dataSubject.complete();
        }
        ongoingQuery.done = true;
      })
      .finally(() => {
        parentOngoingQuery.unsubscribeBlockerCount.next(parentOngoingQuery.unsubscribeBlockerCount.value - 1);
      });

    const takeUntilNotifier = race(
      /**
       * There may be some race condition between the time the query is registered and when the last mutation was sent
       * from the server. Adding a delay of 2 seconds here to make sure any update that the server sent will be
       * reflected in the query.
       */
      ongoingQuery.queryRegistered.pipe(filter(Boolean), delay(2000), take(1)),
      this.destructManager.observeIsDestructing().pipe(take(1)),
    );
    parentOngoingQuery.dataSubject
      .pipe(
        takeWhile(() => {
          return !ongoingQuery.done;
        }),
        takeUntil(takeUntilNotifier),
        filter(Boolean),
        tap(() => {
          if (ongoingQuery.gotInitialResponse) return;
          this.setGotInitialResult(ongoingQuery.clientRequestId);
        }),
        map(documents => documents.filter(doc => queryContext.documentMatchesQuery(doc))),
      )
      .subscribe({
        next: results => {
          for (const result of results) {
            this.setClientRequestIdsForLocalDoc(
              getSquidDocId(result.__docId__, ongoingQuery.query.collectionName, ongoingQuery.query.integrationId),
              result,
            );
          }
          this.notifyAllSubscriptions([ongoingQuery.clientRequestId]);
        },
        error: e => {
          if (!this.destructManager.isDestructing) {
            ongoingQuery.dataSubject.error(e);
          } else {
            ongoingQuery.dataSubject.complete();
          }
        },
      });
  }

  /**
   * Sends the /query request to the server. It:
   * 1 - Waits for when it is safe to send a query to the server (no in-flight mutations)
   * 2 - Increments the number of inflightQueriesCount to prevent parallel mutations
   * 3 - Handles errors
   * 4 - Marks the query as registered
   */
  private sendQueryToServer(ongoingQuery: OngoingQuery): void {
    const oldLimit = ongoingQuery.query.limit;
    const newLimit = oldLimit > 0 && ongoingQuery.subscribe ? oldLimit + FETCH_BEYOND_LIMIT : oldLimit;

    const queryRequest: QueryRequest = {
      query: { ...ongoingQuery.query, limit: newLimit },
      clientRequestId: ongoingQuery.clientRequestId,
      subscribe: ongoingQuery.subscribe,
    };

    ongoingQuery.isInFlight = true;

    this.querySender
      .sendQuery(queryRequest)
      .then(queryResult => {
        ongoingQuery.isInFlight = false;
        ongoingQuery.queryRegistered.next(true);
        this.queryResultsSubject.next(queryResult);
      })
      .catch(e => {
        ongoingQuery.isInFlight = false;
        if (!this.destructManager.isDestructing) {
          DebugLogger.debug('Query error', ongoingQuery.query, e);
          ongoingQuery.dataSubject.error(e);
          ongoingQuery.queryRegistered.error('query failed');
        } else {
          ongoingQuery.dataSubject.complete();
          ongoingQuery.queryRegistered.complete();
        }
        ongoingQuery.done = true;
      });
  }

  /** naive way to refresh queries/subscriptions when we have a new client id */
  private refreshOngoingQueries(): void {
    for (const query of this.ongoingQueries.values()) {
      /**
       * During the refresh, we want to send the query to the server and not use a potential parent query.
       * This is done because the client may believe that the query is a child of another query but that parent was
       * already removed by the server
       */
      this.sendQueryToServerOrUseParentQuery(query, true);
    }
    return;
  }

  private migrateDocIds(idResolutionMap: IdResolutionMap): void {
    const squidDocIds = Object.keys(idResolutionMap);

    for (const set of this.clientRequestIdToLocalDocuments.values()) {
      squidDocIds.forEach(key => {
        if (set.has(key)) {
          set.delete(key);
          set.add(idResolutionMap[key]);
        }
      });
    }
    squidDocIds.forEach(key => {
      replaceKeyInMap(this.localDocumentToClientRequestIds, key, idResolutionMap[key]);
    });
  }
}

/** @internal */
function matchesCompositeCondition(condition: CompositeCondition, doc: SquidDocument): boolean {
  // We treat a composite condition of the form "a > 1, b < 2" as
  //   a > 1
  //   OR a == 1 AND b < 2
  // I.e. a document matches the composite condition if it is later than the field values in the condition, according
  // to the lexical sort defined by the condition's operators.

  for (const subCondition of condition.fields) {
    const valueInDocument = getInPath(doc, subCondition.fieldName) ?? null;
    if (compareOperator(subCondition.value, valueInDocument, subCondition.operator)) {
      return true;
    }
    if (compareOperator(subCondition.value, valueInDocument, '!=')) {
      return false;
    }

    // Otherwise, the value is equal to valueInDocument, so we go on to the next subCondition.
  }

  // All values are equal, so false.
  return false;
}

/** @internal */
function findQueriesForDocumentSync(
  mapping: QueryMapping<QuerySubscriptionId>,
  doc: SquidDocument,
): QuerySubscriptionId[] {
  // Adding all the unconditional mapped ids.
  const result = [...(mapping.unconditional || [])];

  // Mapping form query subscription id and the number of matched conditions for "doc". If the condition count is equal
  // to the condCount in the mapping metadata, this should be in the result.
  const mappedIdFoundMap = new Map<QuerySubscriptionId, number>();

  for (const [encodedCondition, queryIds] of Object.entries(mapping.conditional || {})) {
    const condition = deserializeObj<Condition>(encodedCondition);

    let matchesCondition: boolean;
    if (isSimpleCondition(condition)) {
      const valueInDocument = getInPath(doc, condition.fieldName) ?? null;
      matchesCondition = compareOperator(condition.value, valueInDocument, condition.operator);
    } else {
      matchesCondition = matchesCompositeCondition(condition, doc);
    }

    if (matchesCondition) {
      for (const mappedId of queryIds) {
        mappedIdFoundMap.set(mappedId, (mappedIdFoundMap.get(mappedId) || 0) + 1);
      }
    }
  }

  // The result includes all the mapped ids that have all the conditions matched.
  for (const [mappedId, matchCount] of mappedIdFoundMap.entries()) {
    if (matchCount >= mapping.queriesMetadata[mappedId].condCount) {
      result.push(mappedId);
    }
  }
  return result;
}

class ClientQueryMappingManager {
  private readonly stateService = new StateService({});
  private readonly querySubscriptionIdToQuery: Record<QuerySubscriptionId, Query> = {};

  addQuery(query: Query, querySubscriptionId: QuerySubscriptionId): void {
    this.stateService.runInBatch(() => {
      let condCount = 0;
      const visitedEqualityFields = new Set<string>();
      for (const condition of query.conditions) {
        // Handle condCount. The special case is that we only count equality conditions once since multiple '=='
        // are treated as 'in'.
        if (isSimpleCondition(condition) && ['=='].includes(condition.operator)) {
          const fieldName = encodeValueForMapping(condition.fieldName);
          if (!visitedEqualityFields.has(fieldName)) {
            condCount++;
            visitedEqualityFields.add(fieldName);
          }
        } else {
          condCount++;
        }

        const conditionPath = this.getConditionStatePath(query, condition);
        const clientRequestIdsForCondition = [...(this.stateService.get<string[]>(conditionPath) || [])];
        insertSorted(clientRequestIdsForCondition, querySubscriptionId);
        this.stateService.set(conditionPath, clientRequestIdsForCondition);
      }

      if (!query.conditions.length) {
        const path = ['queryMapping', query.collectionName, query.integrationId, 'mapping', 'unconditional'];
        const currentArray = [...(this.stateService.get<string[]>(path) || [])];
        insertSorted(currentArray, querySubscriptionId);
        this.stateService.set(path, currentArray);
      }
      this.stateService.set([...this.getQueryMetadataStatePath(query, querySubscriptionId), 'condCount'], condCount);
    });
    this.querySubscriptionIdToQuery[querySubscriptionId] = query;
  }

  async removeQuery(querySubscriptionId: QuerySubscriptionId): Promise<Query | undefined> {
    const query = this.querySubscriptionIdToQuery[querySubscriptionId];
    if (!query) return;
    this.stateService.runInBatch(() => {
      for (const cond of query.conditions) {
        const path = this.getConditionStatePath(query, cond);
        const currentArray = [...(this.stateService.get<string[]>(path) || [])];
        removeSorted(currentArray, querySubscriptionId);
        if (currentArray.length) {
          this.stateService.set(path, currentArray);
        } else {
          this.stateService.delete(path);
        }
      }
      if (!query.conditions.length) {
        const path = ['queryMapping', query.collectionName, query.integrationId, 'mapping', 'unconditional'];
        const currentArray = [...(this.stateService.get<string[]>(path) || [])];
        removeSorted(currentArray, querySubscriptionId);
        this.stateService.set(path, currentArray);
      }
      this.stateService.delete(this.getQueryMetadataStatePath(query, querySubscriptionId));
    });
    return query;
  }

  getMapping(
    collectionName: CollectionName,
    integrationId: IntegrationId,
  ): QueryMapping<QuerySubscriptionId> | undefined {
    return this.stateService.get<QueryMapping<QuerySubscriptionId>>([
      'queryMapping',
      collectionName,
      integrationId,
      'mapping',
    ]);
  }

  private getQueryMetadataStatePath(query: Query, clientRequestId: string): string[] {
    // Example path: queryMapping.a/b/c.mapping.queriesMetadata.UUID_HERE.condCount
    return [
      'queryMapping',
      query.collectionName,
      query.integrationId,
      'mapping',
      'queriesMetadata',
      `${clientRequestId}`,
    ];
  }

  private getConditionStatePath(query: Query, cond: Condition): string[] {
    return ['queryMapping', query.collectionName, query.integrationId, 'mapping', 'conditional', encodeCondition(cond)];
  }
}
