Implement transaction for uni-info-watcher blocks and events update. (#203)

Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
Ashwin Phatak 2021-08-11 15:23:44 +05:30 committed by GitHub
parent 1372fc74ef
commit cbc8dce88b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 58 deletions

View File

@ -684,7 +684,7 @@ export class Database {
.getMany();
}
async saveEvents (block: Block, events: DeepPartial<Event>[]): Promise<void> {
async saveEvents (queryRunner: QueryRunner, block: Block, events: DeepPartial<Event>[]): Promise<void> {
const {
hash: blockHash,
number: blockNumber,
@ -700,55 +700,51 @@ export class Database {
// In a transaction:
// (1) Save all the events in the database.
// (2) Add an entry to the block progress table.
await this._conn.transaction(async (tx) => {
const numEvents = events.length;
const blockProgressRepo = tx.getRepository(BlockProgress);
let blockProgress = await blockProgressRepo.findOne({ where: { blockHash } });
const numEvents = events.length;
const blockProgressRepo = queryRunner.manager.getRepository(BlockProgress);
let blockProgress = await blockProgressRepo.findOne({ where: { blockHash } });
if (!blockProgress) {
const entity = blockProgressRepo.create({
blockHash,
parentHash,
blockNumber,
blockTimestamp,
numEvents,
numProcessedEvents: 0,
lastProcessedEventIndex: -1,
isComplete: (numEvents === 0)
});
if (!blockProgress) {
const entity = blockProgressRepo.create({
blockHash,
parentHash,
blockNumber,
blockTimestamp,
numEvents,
numProcessedEvents: 0,
lastProcessedEventIndex: -1,
isComplete: (numEvents === 0)
});
blockProgress = await blockProgressRepo.save(entity);
blockProgress = await blockProgressRepo.save(entity);
// Bulk insert events.
events.forEach(event => { event.block = blockProgress; });
await tx.createQueryBuilder().insert().into(Event).values(events).execute();
}
});
// Bulk insert events.
events.forEach(event => { event.block = blockProgress; });
await queryRunner.manager.createQueryBuilder().insert().into(Event).values(events).execute();
}
}
async getEvent (id: string): Promise<Event | undefined> {
return this._conn.getRepository(Event).findOne(id, { relations: ['block'] });
}
async updateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(SyncStatus);
async updateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
latestCanonicalBlockHash: blockHash,
latestCanonicalBlockNumber: blockNumber
});
}
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
latestCanonicalBlockHash: blockHash,
latestCanonicalBlockNumber: blockNumber
});
}
if (blockNumber >= entity.latestCanonicalBlockNumber) {
entity.chainHeadBlockHash = blockHash;
entity.chainHeadBlockNumber = blockNumber;
}
if (blockNumber >= entity.latestCanonicalBlockNumber) {
entity.chainHeadBlockHash = blockHash;
entity.chainHeadBlockNumber = blockNumber;
}
return await repo.save(entity);
});
return await repo.save(entity);
}
async getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatus | undefined> {
@ -761,24 +757,22 @@ export class Database {
return repo.findOne({ where: { blockHash } });
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(BlockProgress);
const entity = await repo.findOne({ where: { blockHash } });
if (entity && !entity.isComplete) {
if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) {
throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`);
}
entity.lastProcessedEventIndex = lastProcessedEventIndex;
entity.numProcessedEvents++;
if (entity.numProcessedEvents >= entity.numEvents) {
entity.isComplete = true;
}
await repo.save(entity);
async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void> {
const repo = queryRunner.manager.getRepository(BlockProgress);
const entity = await repo.findOne({ where: { blockHash } });
if (entity && !entity.isComplete) {
if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) {
throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`);
}
});
entity.lastProcessedEventIndex = lastProcessedEventIndex;
entity.numProcessedEvents++;
if (entity.numProcessedEvents >= entity.numEvents) {
entity.isComplete = true;
}
await repo.save(entity);
}
}
async _getPrevEntityVersion<Entity> (queryRunner: QueryRunner, repo: Repository<Entity>, findOptions: { [key: string]: any }): Promise<Entity | undefined> {

View File

@ -164,7 +164,20 @@ export class Indexer {
}
async updateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._db.updateSyncStatus(blockHash, blockNumber);
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateSyncStatus(dbTx, blockHash, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getSyncStatus (): Promise<SyncStatus | undefined> {
@ -198,7 +211,20 @@ export class Indexer {
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
return this._db.updateBlockProgress(blockHash, lastProcessedEventIndex);
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateBlockProgress(dbTx, blockHash, lastProcessedEventIndex);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getBundle (id: string, block: BlockHeight): Promise<Bundle | undefined> {
@ -319,7 +345,17 @@ export class Indexer {
});
}
await this._db.saveEvents(block, dbEvents);
const dbTx = await this._db.createTransactionRunner();
try {
await this._db.saveEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async _handlePoolCreated (block: Block, contractAddress: string, tx: Transaction, poolCreatedEvent: PoolCreatedEvent): Promise<void> {