import { Aggregate } from './aggregation';
import { ConcurrencyError, NetworkError } from './errors';
import { KeyValuePair, KeyValueStorage, prefixKeyValueStorage, ValueStorage } from './key-value-storage';
import { Result } from './result';
import { Locker, MIN_STRING, padWith0 } from './utils';

export interface TransactionLogListOptions { gt?: string, limit: number }
export interface TransactionLogFreeOptions { lt: string }
export interface TransactionLogCommitCommand<T> { gt: string, lte: string, aggregate: T }

export interface TransactionLogStore<T> {
  getCommitBoundary(): Promise<Result<unknown, string>>;
  commit(commands: readonly TransactionLogCommitCommand<T>[]): Promise<Result<ConcurrencyError | unknown, void>>;

  list(options: TransactionLogListOptions): Promise<Result<unknown, ReadonlyArray<KeyValuePair<string, T>>>>;
  append(items: readonly T[]): Promise<Result<unknown, string[]>>;
  free(options: TransactionLogFreeOptions): Promise<Result<unknown, void>>;
}

export interface TransactionLogOptions<T> {
  readonly refinement?: {
    readonly aggregate: (entries: readonly T[]) => Promise<Result<unknown, ReadonlyArray<Aggregate<T>>>>;
    readonly aggregateWindow: number;
  }
}

export class TransactionLog<T> {
  constructor(
    private store: TransactionLogStore<T>,
    private options?: TransactionLogOptions<T>,
  ) { }

  async list(options: TransactionLogListOptions): Promise<Result<unknown, ReadonlyArray<KeyValuePair<string, T>>>> {
    return await this.store.list(options);
  }

  async append(...entries: readonly T[]): Promise<Result<unknown, string[]>> {
    return await this.store.append(entries);
  }

  async free(options: TransactionLogFreeOptions): Promise<Result<unknown, void>> {
    return await this.store.free(options);
  }

  async refine(): Promise<Result<unknown, void>> {
    const refinement = this.options?.refinement;
    if (refinement === undefined) {
      console.warn('[TransactionLog] refine requires aggregate function');
      return Result.ok();
    }

    while (true) {
      const commitBoundaryResult = await this.store.getCommitBoundary();
      if (commitBoundaryResult.isFailure()) {
        return commitBoundaryResult.forward();
      }

      let commitBoundary = commitBoundaryResult.value;
      const entries = await this.store.list({ gt: commitBoundary, limit: refinement.aggregateWindow });
      if (entries.isFailure()) {
        return entries.forward();
      }

      if (entries.value.length < refinement.aggregateWindow) {
        // not enough items for optimization
        break;
      }

      const aggreagationResult = await refinement.aggregate(entries.value.map(({ value }) => value));
      if (aggreagationResult.isFailure()) return aggreagationResult.forward();

      const commands: TransactionLogCommitCommand<T>[] = [];
      for (const { value: aggregate, source } of aggreagationResult.value) {
        const lte = entries.value[source.offset + source.length - 1].key;

        commands.push({ aggregate, gt: commitBoundary, lte });

        commitBoundary = lte;
      }

      const result = await this.store.commit(commands);
      if (result.isFailure()) return result.forward();
    }

    return Result.ok();
  }
}

export class KeyValueTransactionLogStore<T> implements TransactionLogStore<T> {
  private counter: ValueStorage<number>;
  private boundary: ValueStorage<string>;
  private entries: KeyValueStorage<string, T>;
  private locker: Locker<string>;

  constructor(
    storage: KeyValueStorage<string, any>,
    private pageSize: number
  ) {
    this.locker = storage;
    this.boundary = new ValueStorage<string>(prefixKeyValueStorage(storage, 'commit_boundary/'), MIN_STRING);
    this.counter = new ValueStorage<number>(prefixKeyValueStorage(storage, 'counter/'), 0);
    this.entries = prefixKeyValueStorage(storage, 'items/');
  }

  async getCommitBoundary(): Promise<Result<unknown, string>> {
    return Result.ok(await this.boundary.get());
  }

  async commit(
    commands: ReadonlyArray<TransactionLogCommitCommand<T>>
  ): Promise<Result<NetworkError | ConcurrencyError, void>> {
    return await this.locker.lock('write', async () => {
      for (const command of commands) {
        const prevBoundary = await this.boundary.get();
        if (prevBoundary !== command.gt) {
          return Result.error(new ConcurrencyError());
        }

        await this.boundary.set(command.lte);
        await this.entries.set(command.lte, command.aggregate);

        // clean up log
        while (true) {
          const entries = await this.entries.list({ gt: command.gt, lt: command.lte, limit: this.pageSize });
          if (entries.length === 0) break;

          await Promise.all(entries.map(({ key }) => this.entries.delete(key)));
        }
      }

      return Result.ok();
    });
  }

  async list(options: TransactionLogListOptions): Promise<Result<unknown, ReadonlyArray<KeyValuePair<string, T>>>> {
    return Result.ok(await this.entries.list(options));
  }

  async append(entries: readonly T[]): Promise<Result<unknown, string[]>> {
    return await this.locker.lock('write', async () => {
      const result: string[] = [];
      for (const item of entries) {
        const counter = await this.counter.get();
        await this.counter.set(counter + 1);
        await this.entries.set(this.createKey(counter), item);
        result.push(this.createKey(counter));
      }

      return Result.ok(result);
    });
  }

  async free({ lt }: TransactionLogFreeOptions): Promise<Result<unknown, void>> {
    const items = await this.entries.list({ lt });
    for (const itemKey of items.map(x => x.key)) {
      await this.entries.delete(itemKey);
    }

    return Result.ok();
  }

  private createKey(counter: number) {
    return padWith0(counter, 9);
  }
}

export interface OptimisticTransactionLogOptions<T> extends TransactionLogOptions<T> {
  readonly pageSize?: number;
}

export class OptimisticTransactionLog<T> {
  private log: TransactionLog<T>;
  constructor(storage: KeyValueStorage<string, T>, options?: OptimisticTransactionLogOptions<T>) {
    this.log = new TransactionLog(
      new KeyValueTransactionLogStore(storage, options?.pageSize ?? 100),
      options,
    );
  }

  async list(options: TransactionLogListOptions): Promise<ReadonlyArray<KeyValuePair<string, T>>> {
    return (await this.log.list(options)).getOrThrow();
  }

  async append(...entries: readonly T[]): Promise<string[]> {
    return (await this.log.append(...entries)).getOrThrow();
  }

  async free(options: TransactionLogFreeOptions): Promise<void> {
    return (await this.log.free(options)).getOrThrow();
  }

  async refine(): Promise<void> {
    return (await this.log.refine()).getOrThrow();
  }
}
