Fix transaction query runner in event handlers. (#192)

Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
Ashwin Phatak 2021-08-09 11:30:43 +05:30 committed by GitHub
parent 88121f9390
commit a9d9b39c37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 85 additions and 40 deletions

View File

@ -112,7 +112,7 @@ export class Database {
let entity = await repo.findOne(findOptions as FindOneOptions<Factory>); let entity = await repo.findOne(findOptions as FindOneOptions<Factory>);
if (!entity && findOptions.where.blockHash) { if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(repo, findOptions); entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
} }
return entity; return entity;
@ -140,7 +140,7 @@ export class Database {
let entity = await repo.findOne(findOptions as FindOneOptions<Bundle>); let entity = await repo.findOne(findOptions as FindOneOptions<Bundle>);
if (!entity && findOptions.where.blockHash) { if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(repo, findOptions); entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
} }
return entity; return entity;
@ -165,7 +165,7 @@ export class Database {
let entity = await repo.findOne(findOptions as FindOneOptions<Token>); let entity = await repo.findOne(findOptions as FindOneOptions<Token>);
if (!entity && findOptions.where.blockHash) { if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(repo, findOptions); entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
} }
return entity; return entity;
@ -173,8 +173,16 @@ export class Database {
async getTokenNoTx ({ id, blockHash }: DeepPartial<Token>): Promise<Token | undefined> { async getTokenNoTx ({ id, blockHash }: DeepPartial<Token>): Promise<Token | undefined> {
const queryRunner = this._conn.createQueryRunner(); const queryRunner = this._conn.createQueryRunner();
let res;
try {
await queryRunner.connect(); await queryRunner.connect();
return this.getToken(queryRunner, { id, blockHash }); res = await this.getToken(queryRunner, { id, blockHash });
} finally {
await queryRunner.release();
}
return res;
} }
async getPool (queryRunner: QueryRunner, { id, blockHash, blockNumber }: DeepPartial<Pool>): Promise<Pool | undefined> { async getPool (queryRunner: QueryRunner, { id, blockHash, blockNumber }: DeepPartial<Pool>): Promise<Pool | undefined> {
@ -200,7 +208,7 @@ export class Database {
let entity = await repo.findOne(findOptions as FindOneOptions<Pool>); let entity = await repo.findOne(findOptions as FindOneOptions<Pool>);
if (!entity && findOptions.where.blockHash) { if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(repo, findOptions); entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
} }
return entity; return entity;
@ -208,12 +216,25 @@ export class Database {
async getPoolNoTx ({ id, blockHash, blockNumber }: DeepPartial<Pool>): Promise<Pool | undefined> { async getPoolNoTx ({ id, blockHash, blockNumber }: DeepPartial<Pool>): Promise<Pool | undefined> {
const queryRunner = this._conn.createQueryRunner(); const queryRunner = this._conn.createQueryRunner();
let res;
try {
await queryRunner.connect(); await queryRunner.connect();
return this.getPool(queryRunner, { id, blockHash, blockNumber }); res = await this.getPool(queryRunner, { id, blockHash, blockNumber });
} finally {
await queryRunner.release();
}
return res;
} }
async getPosition ({ id, blockHash }: DeepPartial<Position>): Promise<Position | undefined> { async getPosition ({ id, blockHash }: DeepPartial<Position>): Promise<Position | undefined> {
const repo = this._conn.getRepository(Position); const queryRunner = this._conn.createQueryRunner();
let entity;
try {
await queryRunner.connect();
const repo = queryRunner.manager.getRepository(Position);
const whereOptions: FindConditions<Position> = { id }; const whereOptions: FindConditions<Position> = { id };
if (blockHash) { if (blockHash) {
@ -228,10 +249,13 @@ export class Database {
} }
}; };
let entity = await repo.findOne(findOptions as FindOneOptions<Position>); entity = await repo.findOne(findOptions as FindOneOptions<Position>);
if (!entity && findOptions.where.blockHash) { if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(repo, findOptions); entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
}
} finally {
await queryRunner.release();
} }
return entity; return entity;
@ -256,7 +280,7 @@ export class Database {
let entity = await repo.findOne(findOptions as FindOneOptions<Tick>); let entity = await repo.findOne(findOptions as FindOneOptions<Tick>);
if (!entity && findOptions.where.blockHash) { if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(repo, findOptions); entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
} }
return entity; return entity;
@ -264,8 +288,16 @@ export class Database {
async getTickNoTx ({ id, blockHash }: DeepPartial<Tick>): Promise<Tick | undefined> { async getTickNoTx ({ id, blockHash }: DeepPartial<Tick>): Promise<Tick | undefined> {
const queryRunner = this._conn.createQueryRunner(); const queryRunner = this._conn.createQueryRunner();
let res;
try {
await queryRunner.connect(); await queryRunner.connect();
return this.getTick(queryRunner, { id, blockHash }); res = await this.getTick(queryRunner, { id, blockHash });
} finally {
await queryRunner.release();
}
return res;
} }
async getPoolDayData (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<PoolDayData>): Promise<PoolDayData | undefined> { async getPoolDayData (queryRunner: QueryRunner, { id, blockHash }: DeepPartial<PoolDayData>): Promise<PoolDayData | undefined> {
@ -287,7 +319,7 @@ export class Database {
let entity = await repo.findOne(findOptions as FindOneOptions<PoolDayData>); let entity = await repo.findOne(findOptions as FindOneOptions<PoolDayData>);
if (!entity && findOptions.where.blockHash) { if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(repo, findOptions); entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
} }
return entity; return entity;
@ -311,7 +343,7 @@ export class Database {
let entity = await repo.findOne(findOptions as FindOneOptions<PoolHourData>); let entity = await repo.findOne(findOptions as FindOneOptions<PoolHourData>);
if (!entity && findOptions.where.blockHash) { if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(repo, findOptions); entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
} }
return entity; return entity;
@ -335,7 +367,7 @@ export class Database {
let entity = await repo.findOne(findOptions as FindOneOptions<UniswapDayData>); let entity = await repo.findOne(findOptions as FindOneOptions<UniswapDayData>);
if (!entity && findOptions.where.blockHash) { if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(repo, findOptions); entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
} }
return entity; return entity;
@ -359,7 +391,7 @@ export class Database {
let entity = await repo.findOne(findOptions as FindOneOptions<TokenDayData>); let entity = await repo.findOne(findOptions as FindOneOptions<TokenDayData>);
if (!entity && findOptions.where.blockHash) { if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(repo, findOptions); entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
} }
return entity; return entity;
@ -383,7 +415,7 @@ export class Database {
let entity = await repo.findOne(findOptions as FindOneOptions<TokenHourData>); let entity = await repo.findOne(findOptions as FindOneOptions<TokenHourData>);
if (!entity && findOptions.where.blockHash) { if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(repo, findOptions); entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
} }
return entity; return entity;
@ -407,7 +439,7 @@ export class Database {
let entity = await repo.findOne(findOptions as FindOneOptions<Transaction>); let entity = await repo.findOne(findOptions as FindOneOptions<Transaction>);
if (!entity && findOptions.where.blockHash) { if (!entity && findOptions.where.blockHash) {
entity = await this._getPrevEntityVersion(repo, findOptions); entity = await this._getPrevEntityVersion(queryRunner, repo, findOptions);
} }
return entity; return entity;
@ -422,7 +454,7 @@ export class Database {
.where(`subTable.id = ${tableName}.id`); .where(`subTable.id = ${tableName}.id`);
if (block.hash) { if (block.hash) {
const { canonicalBlockNumber, blockHashes } = await this._getBranchInfo(block.hash); const { canonicalBlockNumber, blockHashes } = await this._getBranchInfo(queryRunner, block.hash);
subQuery = subQuery subQuery = subQuery
.andWhere(new Brackets(qb => { .andWhere(new Brackets(qb => {
@ -719,8 +751,8 @@ export class Database {
}); });
} }
async getSyncStatus (): Promise<SyncStatus | undefined> { async getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatus | undefined> {
const repo = this._conn.getRepository(SyncStatus); const repo = queryRunner.manager.getRepository(SyncStatus);
return repo.findOne(); return repo.findOne();
} }
@ -749,9 +781,9 @@ export class Database {
}); });
} }
async _getPrevEntityVersion<Entity> (repo: Repository<Entity>, findOptions: { [key: string]: any }): Promise<Entity | undefined> { async _getPrevEntityVersion<Entity> (queryRunner: QueryRunner, repo: Repository<Entity>, findOptions: { [key: string]: any }): Promise<Entity | undefined> {
assert(findOptions.order.blockNumber); assert(findOptions.order.blockNumber);
const { canonicalBlockNumber, blockHashes } = await this._getBranchInfo(findOptions.where.blockHash); const { canonicalBlockNumber, blockHashes } = await this._getBranchInfo(queryRunner, findOptions.where.blockHash);
findOptions.where.blockHash = In(blockHashes); findOptions.where.blockHash = In(blockHashes);
let entity = await repo.findOne(findOptions); let entity = await repo.findOne(findOptions);
@ -764,15 +796,15 @@ export class Database {
return entity; return entity;
} }
async _getBranchInfo (blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> { async _getBranchInfo (queryRunner: QueryRunner, blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> {
const blockRepo = this._conn.getRepository(BlockProgress); const blockRepo = queryRunner.manager.getRepository(BlockProgress);
let block = await blockRepo.findOne({ blockHash }); let block = await blockRepo.findOne({ blockHash });
assert(block); assert(block);
// TODO: Should be calcualted from chainHeadBlockNumber? // TODO: Should be calcualted from chainHeadBlockNumber?
const canonicalBlockNumber = block.blockNumber - MAX_REORG_DEPTH; const canonicalBlockNumber = block.blockNumber - MAX_REORG_DEPTH;
const syncStatus = await this.getSyncStatus(); const syncStatus = await this.getSyncStatus(queryRunner);
assert(syncStatus); assert(syncStatus);
const blockHashes = [block.blockHash]; const blockHashes = [block.blockHash];

View File

@ -168,7 +168,20 @@ export class Indexer {
} }
async getSyncStatus (): Promise<SyncStatus | undefined> { async getSyncStatus (): Promise<SyncStatus | undefined> {
return this._db.getSyncStatus(); const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.getSyncStatus(dbTx);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
} }
async getBlock (blockHash: string): Promise<any> { async getBlock (blockHash: string): Promise<any> {