import Dexie, { Table } from 'dexie';
import { Observable, Subject } from 'rxjs';

import { getTimestamp, Timestamp } from './data/data-types/timestamp';
import { deserialize, serialize } from './data/encoding';
import { KeyValuePair, KeyValueStorage, ListKeyValueStorageOptions } from './data/key-value-storage';
import { generateId, last, MIN_STRING, padWith0, wait, waitWithRandom } from './data/utils';

interface IDBLock {
  readonly key: string;
  readonly expiresAt: Timestamp;
  readonly owner: string;
}

interface IDBMessage {
  readonly key: string;
  readonly createdAt: Timestamp;
  readonly creator: string;
  readonly message: Uint8Array;
}

class Db extends Dexie {
  public items!: Table<KeyValuePair<string, Uint8Array>, string>;
  public locks!: Table<IDBLock, string>;
  public messages!: Table<IDBMessage, string>;

  public constructor(name: string) {
    super(name);
    this.version(1).stores({
      items: 'key',
      locks: 'key',
      messages: 'key',
    });
  }
}

export class IndexedDBKeyValueStorage implements KeyValueStorage<string, any> {
  private id = generateId();
  private latestMessageKey = MIN_STRING;
  private messageCounter = 0;

  private db: Db;

  private messageSubject = new Subject<any>();

  constructor(name: string) {
    this.db = new Db(name);

    this.processMessages();
  }

  get message$(): Observable<any> {
    return this.messageSubject.asObservable();
  }

  async list(options?: ListKeyValueStorageOptions): Promise<readonly KeyValuePair<string, any>[]> {
    let gte = '';
    for (const constraint of [options?.gt, options?.gte, options?.prefix]) {
      if (constraint === undefined) continue;

      if (gte < constraint) {
        gte = constraint;
      }
    }

    const query = this.db.items.where('key').aboveOrEqual(gte);
    const items = await (options?.limit === undefined ? query : query.limit(options.limit + 1)).toArray();
    // const items = await query.toArray();

    return items
      .filter(({ key }) =>
        (options?.gt === undefined || key > options.gt) &&
        (options?.gte === undefined || key >= options.gte) &&
        (options?.lt === undefined || key < options.lt) &&
        (options?.lte === undefined || key <= options.lte) &&
        (options?.prefix === undefined || key.startsWith(options.prefix))
      )
      .slice(0, options?.limit)
      .map(({ key, value }) => ({ key, value: deserialize(value) as any }));
  }

  async get(key: string): Promise<any | undefined> {
    const item = await this.db.items.get(key);

    return item === undefined ? undefined : deserialize(item.value);
  }

  async set(key: string, value: any): Promise<void> {
    await this.db.items.put({ key, value: serialize(value) });
  }

  async delete(key: string): Promise<void> {
    await this.db.items.delete(key);
  }

  async lock<TResult>(key: string, fn: () => Promise<TResult>): Promise<TResult> {
    const owner = generateId();
    while (true) {
      const acquired = await this.db.transaction('rw', this.db.locks, async () => {
        const lock = await this.db.locks.get(key);
        if (lock === undefined || lock.expiresAt <= getTimestamp()) {
          await this.db.locks.put({ key, expiresAt: getTimestamp() + 5_000 as Timestamp, owner });
          return true;
        }

        return false;
      });

      if (acquired) break;

      await waitWithRandom(50);
    }

    let reacquireLock = true;
    (async () => {
      await wait(1000);

      while (reacquireLock) {
        const reacquired = await this.db.transaction('rw', this.db.locks, async () => {
          const lock = await this.db.locks.get(key);
          if (lock?.owner === owner) {
            await this.db.locks.put({ key, expiresAt: getTimestamp() + 5000 as Timestamp, owner });
            return true;
          }

          return false;
        });

        if (!reacquired) {
          console.warn(`[WARN] Wasn't able to reacquire the lock. owner: ${owner}, key: ${key}`);
          break;
        }

        await waitWithRandom(1000);
      }
    })();

    try {
      return await fn();
    } finally {
      reacquireLock = false;

      await this.db.transaction('rw', this.db.locks, async () => {
        const lock = await this.db.locks.get(key);
        if (lock?.owner === owner) {
          await this.db.locks.delete(key);
        } else {
          console.warn(`[WARN] Wasn't able to release the lock. owner: ${owner}, key: ${key}`);
        }
      });
    }
  }

  async post(message: any): Promise<void> {
    await this.db.messages.put({
      key: `${getTimestamp()}/${this.id}/${padWith0(this.messageCounter, 9)}`,
      createdAt: getTimestamp(),
      creator: this.id,
      message: serialize(message),
    });
    this.messageCounter += 1;
  }

  async processMessages(): Promise<void> {
    (async () => {
      while (true) {
        try {
          const wrappers = await this.db.messages.where('key').above(this.latestMessageKey).toArray();
          if (wrappers.length > 0) {
            for (const wrapper of wrappers.filter(x => x.creator !== this.id)) {
              this.messageSubject.next(deserialize(wrapper.message));
            }

            this.latestMessageKey = last(wrappers).key;
          }

          await waitWithRandom(500);
        } catch (err: unknown) {
          console.log('Unknown error occurred during reading messages from indexeddb:', err);
        }
      }
    })();

    (async () => {
      while (true) {
        try {
          const wrappers = await this.db.messages.where('key').below((getTimestamp() - 60 * 1_000).toString()).toArray();
          await this.db.messages.bulkDelete(wrappers.map(x => x.key));

          await waitWithRandom(60 * 1000);
        } catch (err: unknown) {
          console.log('Unknown error occurred during deletion of messages from indexeddb:', err);
        }
      }
    })();
  }
}
