Implement transactions in uni-watcher. (#190)

Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
Ashwin Phatak 2021-08-06 17:53:55 +05:30 committed by GitHub
parent 63620f0a0a
commit 91463136d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 215 additions and 113 deletions

View File

@ -1,6 +1,6 @@
import assert from 'assert';
import _ from 'lodash';
import { Connection, ConnectionOptions, createConnection, DeepPartial } from 'typeorm';
import { Connection, ConnectionOptions, createConnection, DeepPartial, QueryRunner } from 'typeorm';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
@ -30,6 +30,13 @@ export class Database {
return this._conn.close();
}
async createTransactionRunner (): Promise<QueryRunner> {
const queryRunner = this._conn.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
return queryRunner;
}
async getBlockEvents (blockHash: string): Promise<Event[]> {
return this._conn.getRepository(Event)
.createQueryBuilder('event')
@ -68,7 +75,7 @@ export class Database {
return events;
}
async saveEvents (block: any, events: DeepPartial<Event>[]): Promise<void> {
async saveEvents (queryRunner: QueryRunner, block: any, events: DeepPartial<Event>[]): Promise<void> {
const {
hash: blockHash,
number: blockNumber,
@ -86,89 +93,81 @@ export class Database {
// In a transaction:
// (1) Save all the events in the database.
// (2) Add an entry to the block progress table.
await this._conn.transaction(async (tx) => {
const numEvents = events.length;
const blockProgressRepo = tx.getRepository(BlockProgress);
let blockProgress = await blockProgressRepo.findOne({ where: { blockHash } });
if (!blockProgress) {
const entity = blockProgressRepo.create({
blockHash,
parentHash,
blockNumber,
blockTimestamp,
numEvents,
numProcessedEvents: 0,
lastProcessedEventIndex: -1,
isComplete: (numEvents === 0)
});
const numEvents = events.length;
const blockProgressRepo = queryRunner.manager.getRepository(BlockProgress);
let blockProgress = await blockProgressRepo.findOne({ where: { blockHash } });
if (!blockProgress) {
const entity = blockProgressRepo.create({
blockHash,
parentHash,
blockNumber,
blockTimestamp,
numEvents,
numProcessedEvents: 0,
lastProcessedEventIndex: -1,
isComplete: (numEvents === 0)
});
blockProgress = await blockProgressRepo.save(entity);
blockProgress = await blockProgressRepo.save(entity);
// Bulk insert events.
events.forEach(event => {
event.block = blockProgress;
});
// Bulk insert events.
events.forEach(event => {
event.block = blockProgress;
});
await tx.createQueryBuilder().insert().into(Event).values(events).execute();
}
});
await queryRunner.manager.createQueryBuilder().insert().into(Event).values(events).execute();
}
}
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(SyncStatus);
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
const entity = await repo.findOne();
assert(entity);
const entity = await repo.findOne();
assert(entity);
if (blockNumber >= entity.latestIndexedBlockNumber) {
entity.latestIndexedBlockHash = blockHash;
entity.latestIndexedBlockNumber = blockNumber;
}
if (blockNumber >= entity.latestIndexedBlockNumber) {
entity.latestIndexedBlockHash = blockHash;
entity.latestIndexedBlockNumber = blockNumber;
}
return await repo.save(entity);
});
return await repo.save(entity);
}
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(SyncStatus);
async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
const entity = await repo.findOne();
assert(entity);
const entity = await repo.findOne();
assert(entity);
if (blockNumber >= entity.latestCanonicalBlockNumber) {
entity.latestCanonicalBlockHash = blockHash;
entity.latestCanonicalBlockNumber = blockNumber;
}
if (blockNumber >= entity.latestCanonicalBlockNumber) {
entity.latestCanonicalBlockHash = blockHash;
entity.latestCanonicalBlockNumber = blockNumber;
}
return await repo.save(entity);
});
return await repo.save(entity);
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(SyncStatus);
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
chainHeadBlockHash: blockHash,
chainHeadBlockNumber: blockNumber,
latestCanonicalBlockHash: blockHash,
latestCanonicalBlockNumber: blockNumber,
latestIndexedBlockHash: '',
latestIndexedBlockNumber: -1
});
}
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
chainHeadBlockHash: blockHash,
chainHeadBlockNumber: blockNumber,
latestCanonicalBlockHash: blockHash,
latestCanonicalBlockNumber: blockNumber,
latestIndexedBlockHash: '',
latestIndexedBlockNumber: -1
});
}
if (blockNumber >= entity.chainHeadBlockNumber) {
entity.chainHeadBlockHash = blockHash;
entity.chainHeadBlockNumber = blockNumber;
}
if (blockNumber >= entity.chainHeadBlockNumber) {
entity.chainHeadBlockHash = blockHash;
entity.chainHeadBlockNumber = blockNumber;
}
return await repo.save(entity);
});
return await repo.save(entity);
}
async getSyncStatus (): Promise<SyncStatus | undefined> {
@ -180,8 +179,8 @@ export class Database {
return this._conn.getRepository(Event).findOne(id, { relations: ['block'] });
}
async saveEventEntity (entity: Event): Promise<Event> {
const repo = this._conn.getRepository(Event);
async saveEventEntity (queryRunner: QueryRunner, entity: Event): Promise<Event> {
const repo = queryRunner.manager.getRepository(Event);
return await repo.save(entity);
}
@ -200,20 +199,18 @@ export class Database {
.getOne();
}
async saveContract (address: string, kind: string, startingBlock: number): Promise<void> {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Contract);
async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise<void> {
const repo = queryRunner.manager.getRepository(Contract);
const numRows = await repo
.createQueryBuilder()
.where('address = :address', { address })
.getCount();
const numRows = await repo
.createQueryBuilder()
.where('address = :address', { address })
.getCount();
if (numRows === 0) {
const entity = repo.create({ address, kind, startingBlock });
await repo.save(entity);
}
});
if (numRows === 0) {
const entity = repo.create({ address, kind, startingBlock });
await repo.save(entity);
}
}
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
@ -223,8 +220,8 @@ export class Database {
.getMany();
}
async markBlockAsPruned (block: BlockProgress): Promise<BlockProgress> {
const repo = this._conn.getRepository(BlockProgress);
async markBlockAsPruned (queryRunner: QueryRunner, block: BlockProgress): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
block.isPruned = true;
return repo.save(block);
}
@ -234,23 +231,21 @@ export class Database {
return repo.findOne({ where: { blockHash } });
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(BlockProgress);
const entity = await repo.findOne({ where: { blockHash } });
if (entity && !entity.isComplete) {
if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) {
throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`);
}
entity.lastProcessedEventIndex = lastProcessedEventIndex;
entity.numProcessedEvents++;
if (entity.numProcessedEvents >= entity.numEvents) {
entity.isComplete = true;
}
await repo.save(entity);
async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void> {
const repo = queryRunner.manager.getRepository(BlockProgress);
const entity = await repo.findOne({ where: { blockHash } });
if (entity && !entity.isComplete) {
if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) {
throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`);
}
});
entity.lastProcessedEventIndex = lastProcessedEventIndex;
entity.numProcessedEvents++;
if (entity.numProcessedEvents >= entity.numEvents) {
entity.isComplete = true;
}
await repo.save(entity);
}
}
}

View File

@ -1,5 +1,5 @@
import debug from 'debug';
import { DeepPartial } from 'typeorm';
import { DeepPartial, QueryRunner } from 'typeorm';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
import assert from 'assert';
@ -138,13 +138,13 @@ export class Indexer {
return result;
}
async triggerIndexingOnEvent (dbEvent: Event): Promise<void> {
async triggerIndexingOnEvent (dbTx: QueryRunner, dbEvent: Event): Promise<void> {
const re = this.getResultEvent(dbEvent);
switch (re.event.__typename) {
case 'PoolCreatedEvent': {
const poolContract = ethers.utils.getAddress(re.event.pool);
await this._db.saveContract(poolContract, KIND_POOL, dbEvent.block.blockNumber);
await this._db.saveContract(dbTx, poolContract, KIND_POOL, dbEvent.block.blockNumber);
}
}
}
@ -154,8 +154,18 @@ export class Indexer {
}
async processEvent (event: Event): Promise<void> {
// Trigger indexing of data based on the event.
await this.triggerIndexingOnEvent(event);
const dbTx = await this._db.createTransactionRunner();
try {
// Trigger indexing of data based on the event.
await this.triggerIndexingOnEvent(dbTx, event);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
parseEventNameAndArgs (kind: string, logObj: any): any {
@ -371,19 +381,68 @@ export class Indexer {
});
}
await this._db.saveEvents(block, dbEvents);
const dbTx = await this._db.createTransactionRunner();
try {
await this._db.saveEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._db.updateSyncStatusIndexedBlock(blockHash, blockNumber);
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateSyncStatusIndexedBlock(dbTx, blockHash, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._db.updateSyncStatusChainHead(blockHash, blockNumber);
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateSyncStatusChainHead(dbTx, blockHash, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
return this._db.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateSyncStatusCanonicalBlock(dbTx, blockHash, blockNumber);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getSyncStatus (): Promise<SyncStatus | undefined> {
@ -400,7 +459,20 @@ export class Indexer {
}
async saveEventEntity (dbEvent: Event): Promise<Event> {
return this._db.saveEventEntity(dbEvent);
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = this._db.saveEventEntity(dbTx, dbEvent);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
@ -439,11 +511,37 @@ export class Indexer {
}
async markBlockAsPruned (block: BlockProgress): Promise<BlockProgress> {
return this._db.markBlockAsPruned(block);
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.markBlockAsPruned(dbTx, block);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
return this._db.updateBlockProgress(blockHash, lastProcessedEventIndex);
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateBlockProgress(dbTx, blockHash, lastProcessedEventIndex);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {

View File

@ -6,8 +6,17 @@ import { Client as UniClient } from '../client';
export async function watchContract (db: Database, address: string, kind: string, startingBlock: number): Promise<void> {
// Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress).
const contractAddress = ethers.utils.getAddress(address);
const dbTx = await db.createTransactionRunner();
await db.saveContract(contractAddress, kind, startingBlock);
try {
await db.saveContract(dbTx, contractAddress, kind, startingBlock);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
export const watchEvent = async (uniClient: UniClient, eventType: string): Promise<any> => {