import { filter } from 'rxjs';
import { aggregate, aggregateDummy } from './aggregation';
import { CabinetCRDT, CabinetCRDTOptions, Transaction, TransactionBatch } from './cabinet-crdt';
import { CloudLogEntry } from './cloud';
import { Compression } from './compression';
import { Cabinet, CabinetAction } from './data-types/cabinet';
import { DeviceId } from './data-types/device';
import { EncryptionService } from './encryption-service';
import { ConcurrencyError, EncryptionKeyNotFoundError, NetworkError, UnknownError } from './errors';
import { KeyValuePair, KeyValueStorage, prefixKeyValueStorage, ValueStorage } from './key-value-storage';
import { Result } from './result';
import { OptimisticTransactionLog, TransactionLog } from './transaction-log';
import { defer, generateId, last, lastOrDefault, Lazy, MIN_STRING } from './utils';

interface LimboSlice {
  readonly cloudEntries: readonly CloudLogEntry[];
}

export interface CabinetServiceOptions {
  readonly optimize: boolean | undefined;
  readonly compressionLevel: 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | undefined;
  readonly abyssPageSize?: number;
  readonly targetPageSize?: number;
  readonly optimizeEveryNSyncs?: number;
  readonly abyssOptimizeWindow?: number;

  readonly _cabinetCRDTOptions?: Partial<CabinetCRDTOptions>;
}

export class CabinetService {
  private limbo: OptimisticTransactionLog<LimboSlice>;
  private abyss: OptimisticTransactionLog<TransactionBatch>;
  private purgatory: OptimisticTransactionLog<TransactionBatch>;

  private targetSyncedKey: ValueStorage<string>;
  private abyssSyncedKey: ValueStorage<string>;
  private limboSyncedKey: ValueStorage<string>;

  private optimizeCounter: ValueStorage<number>;

  private deviceId: ValueStorage<DeviceId>;

  private cabinet: Lazy<CabinetCRDT>;
  private compression: Compression;

  constructor(
    private storage: KeyValueStorage<string, any>,
    private encryptionService: EncryptionService,
    private targetLog: TransactionLog<CloudLogEntry>,
    private options?: CabinetServiceOptions,
  ) {
    this.compression = new Compression({ level: this.options?.compressionLevel });

    const wrapStorage = <TKey extends string, TValue>(prefix: string): KeyValueStorage<TKey, TValue> => {
      return prefixKeyValueStorage(storage, prefix);
    }

    this.limbo = new OptimisticTransactionLog(wrapStorage('limbo/'));
    this.abyss = new OptimisticTransactionLog(
      wrapStorage('abyss/'),
      {
        refinement: {
          aggregate: items => Promise.resolve(Result.ok(aggregate(items))),
          aggregateWindow: this.options?.abyssOptimizeWindow ?? 100,
        }
      }
    );
    this.purgatory = new OptimisticTransactionLog(wrapStorage('purgatory/'));

    this.targetSyncedKey = new ValueStorage(wrapStorage('target_sync_counter/'), MIN_STRING);
    this.abyssSyncedKey = new ValueStorage(wrapStorage('abyss_sync_counter/'), MIN_STRING);
    this.limboSyncedKey = new ValueStorage(wrapStorage('limbo_sync_counter/'), MIN_STRING);

    this.optimizeCounter = new ValueStorage(wrapStorage('optimize_counter/'), 1);

    this.deviceId = new ValueStorage(wrapStorage('device_id/'), generateId() as DeviceId);

    this.cabinet = new Lazy(() => this.reviveCabinet());

    this.storage.message$
      .pipe(filter(x => x.transactions !== undefined))
      .subscribe(({ transactions }) => this.cabinet.get().then(x => x.apply(transactions)));
  }

  async dirty(): Promise<boolean> {
    const purganoryItems = await this.purgatory.list({ gt: await this.targetSyncedKey.get(), limit: 1 });
    return purganoryItems.length > 0;
  }

  async getCabinet(): Promise<Cabinet> {
    const pending = await this.limbo.list({ gt: await this.abyssSyncedKey.get(), limit: 1 });
    if (pending.length > 0) {
      // needed in case if between syncs passcode was provided
      await this.syncAbyss();
    }

    return await this.cabinet.get();
  }

  async execute(action: CabinetAction): Promise<void> {
    const transaction = await this.cabinet.get().then(x => x.execute(action));
    this.storage.post({ transactions: [transaction] });
    await this.saveTransaction(transaction, 'user');
  }

  async sync(): Promise<Result<NetworkError | EncryptionKeyNotFoundError | UnknownError, void>> {
    return await this.storage.lock('sync', () => this.syncInternal());
  }

  private async syncInternal(): Promise<Result<NetworkError | EncryptionKeyNotFoundError | UnknownError, void>> {
    // 1. sync limbo
    const syncLimboResult = await this.syncLimbo();
    if (syncLimboResult.isFailure()) {
      return syncLimboResult.forward();
    }

    // 2. sync abyss
    const syncAbyssResult = await this.syncAbyss();
    if (syncAbyssResult.isFailure()) {
      return syncAbyssResult.forward();
    }

    // 3. sync with target
    const targetSyncedKey = await this.targetSyncedKey.get();
    const purgatoryEvents = await this.purgatory.list({ gt: targetSyncedKey, limit: 10_000 });

    const preparedEnvelopes = this.options?.optimize ?? true
      ? aggregate(purgatoryEvents.map(x => x.value))
      : aggregateDummy(purgatoryEvents.map(x => x.value));
    for (const { value: envelopes, source } of preparedEnvelopes) {
      const cloudLog: Result<EncryptionKeyNotFoundError, CloudLogEntry> = await this.encryptionService.encrypt(
        this.compression.deflate(envelopes),
      );
      if (cloudLog.isFailure()) return cloudLog.forward();

      const response = await this.targetLog.append(cloudLog.value);

      if (response.isFailure()) {
        if (response.error instanceof ConcurrencyError) {
          return await this.syncInternal();
        }

        return Result.error(new UnknownError(response.error));
      }

      await this.targetSyncedKey.set(purgatoryEvents[source.offset + source.length - 1].key);
    }

    await this.purgatory.free({ lt: await this.targetSyncedKey.get() });

    if (this.options?.optimize ?? true) {
      const optimizeCounter = await this.optimizeCounter.get();
      await this.optimizeCounter.set(optimizeCounter + 1);

      if (optimizeCounter % (this.options?.optimizeEveryNSyncs ?? 20) === 0) {
        const [targetResult] = await Promise.all([this.targetLog.refine(), this.abyss.refine()]);
        if (targetResult.isFailure()) {
          console.warn('[WARN] target refinement failed:', targetResult.error);
        }
      }
    }

    return Result.ok();
  }

  private async syncLimbo(): Promise<Result<NetworkError | UnknownError, void>> {
    let limboSyncedKey = await this.limboSyncedKey.get();
    for (; ;) {
      const slice = await this.targetLog.list({
        gt: limboSyncedKey,
        limit: this.options?.targetPageSize ?? 100,
      });
      if (slice.isFailure()) return Result.error(new UnknownError(slice.error));
      if (slice.value.length === 0) {
        break;
      }

      await this.limbo.append({ cloudEntries: slice.value.map(x => x.value) });

      limboSyncedKey = last(slice.value).key;
      await this.limboSyncedKey.set(limboSyncedKey);
    }

    return Result.ok();
  }

  private async syncAbyss(): Promise<Result<EncryptionKeyNotFoundError, void>> {
    return await this.storage.lock('abyss_sync', async () => {
      const limboSlices = await this.limbo.list({ gt: await this.abyssSyncedKey.get(), limit: 1_000 });
      const deviceId = await this.deviceId.get();

      const transactionsToApply: Transaction[] = [];
      for (const slice of limboSlices) {
        const deflatedCloudEntries = await Promise.all(
          slice.value.cloudEntries.map(x => this.encryptionService.decrypt(x))
        );
        for (const deflatedCloudEntry of deflatedCloudEntries) {
          if (deflatedCloudEntry.isFailure()) return deflatedCloudEntry.forward();
          const transactions = this.compression.inflate(deflatedCloudEntry.value);

          for (const transaction of transactions) {
            if (deviceId === transaction.deviceId) continue;

            transactionsToApply.push(transaction);
            await this.saveTransaction(transaction, 'limbo');
          }
        }

        await this.abyssSyncedKey.set(slice.key);
      }

      await this.limbo.free({ lt: await this.abyssSyncedKey.get() });
      if (transactionsToApply.length > 0) {
        await this.cabinet.get().then(x => x.apply(transactionsToApply));
        this.storage.post({ transactions: transactionsToApply });
      }

      return Result.ok();
    });
  }

  private async saveTransaction(transaction: Transaction, source: 'user' | 'limbo'): Promise<void> {
    await (source === 'user' ? this.purgatory.append([transaction]) : Promise.resolve());
    await this.abyss.append([transaction]);
  }

  private async reviveCabinet(): Promise<CabinetCRDT> {
    const state = new CabinetCRDT({ deviceId: await this.deviceId.get(), ...this.options?._cabinetCRDTOptions });
    let gt: string | undefined = undefined;

    while (true) {
      const transactions: ReadonlyArray<KeyValuePair<string, TransactionBatch>> = await this.abyss.list({
        gt,
        limit: this.options?.abyssPageSize ?? 100
      });

      if (transactions.length === 0) {
        break;
      }

      state.apply(transactions.flatMap(x => x.value));

      gt = lastOrDefault(transactions)?.key;
      await defer();
    }

    return state;
  }
}
