From cbc8dce88b493b25fe4cc46cce934c8dd3032919 Mon Sep 17 00:00:00 2001 From: Ashwin Phatak Date: Wed, 11 Aug 2021 15:23:44 +0530 Subject: [PATCH] Implement transaction for uni-info-watcher blocks and events update. (#203) Co-authored-by: nabarun --- packages/uni-info-watcher/src/database.ts | 104 ++++++++++------------ packages/uni-info-watcher/src/indexer.ts | 42 ++++++++- 2 files changed, 88 insertions(+), 58 deletions(-) diff --git a/packages/uni-info-watcher/src/database.ts b/packages/uni-info-watcher/src/database.ts index 76ca9e9f..5b4af7e5 100644 --- a/packages/uni-info-watcher/src/database.ts +++ b/packages/uni-info-watcher/src/database.ts @@ -684,7 +684,7 @@ export class Database { .getMany(); } - async saveEvents (block: Block, events: DeepPartial[]): Promise { + async saveEvents (queryRunner: QueryRunner, block: Block, events: DeepPartial[]): Promise { const { hash: blockHash, number: blockNumber, @@ -700,55 +700,51 @@ export class Database { // In a transaction: // (1) Save all the events in the database. // (2) Add an entry to the block progress table. - await this._conn.transaction(async (tx) => { - const numEvents = events.length; - const blockProgressRepo = tx.getRepository(BlockProgress); - let blockProgress = await blockProgressRepo.findOne({ where: { blockHash } }); + const numEvents = events.length; + const blockProgressRepo = queryRunner.manager.getRepository(BlockProgress); + let blockProgress = await blockProgressRepo.findOne({ where: { blockHash } }); - if (!blockProgress) { - const entity = blockProgressRepo.create({ - blockHash, - parentHash, - blockNumber, - blockTimestamp, - numEvents, - numProcessedEvents: 0, - lastProcessedEventIndex: -1, - isComplete: (numEvents === 0) - }); + if (!blockProgress) { + const entity = blockProgressRepo.create({ + blockHash, + parentHash, + blockNumber, + blockTimestamp, + numEvents, + numProcessedEvents: 0, + lastProcessedEventIndex: -1, + isComplete: (numEvents === 0) + }); - blockProgress = await blockProgressRepo.save(entity); + blockProgress = await blockProgressRepo.save(entity); - // Bulk insert events. - events.forEach(event => { event.block = blockProgress; }); - await tx.createQueryBuilder().insert().into(Event).values(events).execute(); - } - }); + // Bulk insert events. + events.forEach(event => { event.block = blockProgress; }); + await queryRunner.manager.createQueryBuilder().insert().into(Event).values(events).execute(); + } } async getEvent (id: string): Promise { return this._conn.getRepository(Event).findOne(id, { relations: ['block'] }); } - async updateSyncStatus (blockHash: string, blockNumber: number): Promise { - return await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(SyncStatus); + async updateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise { + const repo = queryRunner.manager.getRepository(SyncStatus); - let entity = await repo.findOne(); - if (!entity) { - entity = repo.create({ - latestCanonicalBlockHash: blockHash, - latestCanonicalBlockNumber: blockNumber - }); - } + let entity = await repo.findOne(); + if (!entity) { + entity = repo.create({ + latestCanonicalBlockHash: blockHash, + latestCanonicalBlockNumber: blockNumber + }); + } - if (blockNumber >= entity.latestCanonicalBlockNumber) { - entity.chainHeadBlockHash = blockHash; - entity.chainHeadBlockNumber = blockNumber; - } + if (blockNumber >= entity.latestCanonicalBlockNumber) { + entity.chainHeadBlockHash = blockHash; + entity.chainHeadBlockNumber = blockNumber; + } - return await repo.save(entity); - }); + return await repo.save(entity); } async getSyncStatus (queryRunner: QueryRunner): Promise { @@ -761,24 +757,22 @@ export class Database { return repo.findOne({ where: { blockHash } }); } - async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(BlockProgress); - const entity = await repo.findOne({ where: { blockHash } }); - if (entity && !entity.isComplete) { - if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) { - throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`); - } - - entity.lastProcessedEventIndex = lastProcessedEventIndex; - entity.numProcessedEvents++; - if (entity.numProcessedEvents >= entity.numEvents) { - entity.isComplete = true; - } - - await repo.save(entity); + async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise { + const repo = queryRunner.manager.getRepository(BlockProgress); + const entity = await repo.findOne({ where: { blockHash } }); + if (entity && !entity.isComplete) { + if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) { + throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`); } - }); + + entity.lastProcessedEventIndex = lastProcessedEventIndex; + entity.numProcessedEvents++; + if (entity.numProcessedEvents >= entity.numEvents) { + entity.isComplete = true; + } + + await repo.save(entity); + } } async _getPrevEntityVersion (queryRunner: QueryRunner, repo: Repository, findOptions: { [key: string]: any }): Promise { diff --git a/packages/uni-info-watcher/src/indexer.ts b/packages/uni-info-watcher/src/indexer.ts index 500b8800..f8bc3013 100644 --- a/packages/uni-info-watcher/src/indexer.ts +++ b/packages/uni-info-watcher/src/indexer.ts @@ -164,7 +164,20 @@ export class Indexer { } async updateSyncStatus (blockHash: string, blockNumber: number): Promise { - return this._db.updateSyncStatus(blockHash, blockNumber); + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.updateSyncStatus(dbTx, blockHash, blockNumber); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; } async getSyncStatus (): Promise { @@ -198,7 +211,20 @@ export class Indexer { } async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - return this._db.updateBlockProgress(blockHash, lastProcessedEventIndex); + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.updateBlockProgress(dbTx, blockHash, lastProcessedEventIndex); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; } async getBundle (id: string, block: BlockHeight): Promise { @@ -319,7 +345,17 @@ export class Indexer { }); } - await this._db.saveEvents(block, dbEvents); + const dbTx = await this._db.createTransactionRunner(); + + try { + await this._db.saveEvents(dbTx, block, dbEvents); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } } async _handlePoolCreated (block: Block, contractAddress: string, tx: Transaction, poolCreatedEvent: PoolCreatedEvent): Promise {