watcher-ts/packages/util/src/database.ts

208 lines
6.6 KiB
TypeScript
Raw Normal View History

//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import { Connection, ConnectionOptions, createConnection, DeepPartial, FindConditions, QueryRunner, Repository } from 'typeorm';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import { BlockProgressInterface, EventInterface, SyncStatusInterface } from './types';
export class Database {
_config: ConnectionOptions
_conn!: Connection
constructor (config: ConnectionOptions) {
assert(config);
this._config = config;
}
async init (): Promise<Connection> {
assert(!this._conn);
this._conn = await createConnection({
...this._config,
namingStrategy: new SnakeNamingStrategy()
});
return this._conn;
}
async close (): Promise<void> {
return this._conn.close();
}
async createTransactionRunner (): Promise<QueryRunner> {
const queryRunner = this._conn.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
return queryRunner;
}
async getSyncStatus (repo: Repository<SyncStatusInterface>): Promise<SyncStatusInterface | undefined> {
return repo.findOne();
}
async updateSyncStatusIndexedBlock (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
const entity = await repo.findOne();
assert(entity);
if (blockNumber >= entity.latestIndexedBlockNumber) {
entity.latestIndexedBlockHash = blockHash;
entity.latestIndexedBlockNumber = blockNumber;
}
return await repo.save(entity);
}
async updateSyncStatusCanonicalBlock (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
const entity = await repo.findOne();
assert(entity);
if (blockNumber >= entity.latestCanonicalBlockNumber) {
entity.latestCanonicalBlockHash = blockHash;
entity.latestCanonicalBlockNumber = blockNumber;
}
return await repo.save(entity);
}
async updateSyncStatusChainHead (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
chainHeadBlockHash: blockHash,
chainHeadBlockNumber: blockNumber,
latestCanonicalBlockHash: blockHash,
latestCanonicalBlockNumber: blockNumber,
latestIndexedBlockHash: '',
latestIndexedBlockNumber: -1
});
}
if (blockNumber >= entity.chainHeadBlockNumber) {
entity.chainHeadBlockHash = blockHash;
entity.chainHeadBlockNumber = blockNumber;
}
return await repo.save(entity);
}
async getBlockProgress (repo: Repository<BlockProgressInterface>, blockHash: string): Promise<BlockProgressInterface | undefined> {
return repo.findOne({ where: { blockHash } });
}
async getBlocksAtHeight (repo: Repository<BlockProgressInterface>, height: number, isPruned: boolean): Promise<BlockProgressInterface[]> {
return repo.createQueryBuilder('block_progress')
.where('block_number = :height AND is_pruned = :isPruned', { height, isPruned })
.getMany();
}
async updateBlockProgress (repo: Repository<BlockProgressInterface>, blockHash: string, lastProcessedEventIndex: number): Promise<void> {
const entity = await repo.findOne({ where: { blockHash } });
if (entity && !entity.isComplete) {
if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) {
throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`);
}
entity.lastProcessedEventIndex = lastProcessedEventIndex;
entity.numProcessedEvents++;
if (entity.numProcessedEvents >= entity.numEvents) {
entity.isComplete = true;
}
await repo.save(entity);
}
}
async markBlockAsPruned (repo: Repository<BlockProgressInterface>, block: BlockProgressInterface): Promise<BlockProgressInterface> {
block.isPruned = true;
return repo.save(block);
}
async getEvent (repo: Repository<EventInterface>, id: string): Promise<EventInterface | undefined> {
return repo.findOne(id, { relations: ['block'] });
}
async getBlockEvents (repo: Repository<EventInterface>, blockHash: string): Promise<EventInterface[]> {
return repo.createQueryBuilder('event')
.innerJoinAndSelect('event.block', 'block')
.where('block_hash = :blockHash', { blockHash })
.addOrderBy('event.id', 'ASC')
.getMany();
}
async saveEvents (blockRepo: Repository<BlockProgressInterface>, eventRepo: Repository<EventInterface>, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void> {
const {
blockHash,
blockNumber,
blockTimestamp,
parentHash
} = block;
assert(blockHash);
assert(blockNumber);
assert(blockTimestamp);
assert(parentHash);
// In a transaction:
// (1) Save all the events in the database.
// (2) Add an entry to the block progress table.
const numEvents = events.length;
let blockProgress = await blockRepo.findOne({ where: { blockHash } });
if (!blockProgress) {
const entity = blockRepo.create({
blockHash,
parentHash,
blockNumber,
blockTimestamp,
numEvents,
numProcessedEvents: 0,
lastProcessedEventIndex: -1,
isComplete: (numEvents === 0)
});
blockProgress = await blockRepo.save(entity);
// Bulk insert events.
events.forEach(event => {
event.block = blockProgress;
});
await eventRepo.createQueryBuilder().insert().values(events).execute();
}
}
async getEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<Entity[]> {
const repo = queryRunner.manager.getRepository(entity);
const entities = await repo.find(findConditions);
return entities;
}
async isEntityEmpty<Entity> (entity: new () => Entity): Promise<boolean> {
const queryRunner = this._conn.createQueryRunner();
try {
await queryRunner.connect();
const data = await this.getEntities(queryRunner, entity);
if (data.length > 0) {
return false;
}
return true;
} finally {
await queryRunner.release();
}
}
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void> {
const repo = queryRunner.manager.getRepository(entity);
const entities = await repo.find(findConditions);
await repo.remove(entities);
}
}