diff --git a/packages/ipld-eth-client/src/eth-client.ts b/packages/ipld-eth-client/src/eth-client.ts index 93c71334..2ba6682f 100644 --- a/packages/ipld-eth-client/src/eth-client.ts +++ b/packages/ipld-eth-client/src/eth-client.ts @@ -65,8 +65,26 @@ export class EthClient { async getLogs (vars: Vars): Promise { const result = await this._getCachedOrFetch('getLogs', vars); - const { getLogs: resultLogs, block: { number: blockNumHex, timestamp: timestampHex } } = result; - const block = { hash: vars.blockHash, number: parseInt(blockNumHex, 16), timestamp: parseInt(timestampHex, 16) }; + const { + getLogs: resultLogs, + block: { + number: blockNumHex, + timestamp: timestampHex, + parent: { + hash: parentHash + } + } + } = result; + + const block = { + hash: vars.blockHash, + number: parseInt(blockNumHex, 16), + timestamp: parseInt(timestampHex, 16), + parent: { + hash: parentHash + } + }; + const logs = resultLogs.map((logEntry: any) => _.merge({}, logEntry, { transaction: { block } })); return { logs, block }; diff --git a/packages/ipld-eth-client/src/eth-queries.ts b/packages/ipld-eth-client/src/eth-queries.ts index f4052cee..906a3764 100644 --- a/packages/ipld-eth-client/src/eth-queries.ts +++ b/packages/ipld-eth-client/src/eth-queries.ts @@ -28,6 +28,9 @@ query getLogs($blockHash: Bytes32!, $contract: Address) { block(hash: $blockHash) { number timestamp + parent { + hash + } } } `; diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index cf7dba39..f56de579 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -32,30 +32,34 @@ export class Database { async getBlockEvents (blockHash: string): Promise { return this._conn.getRepository(Event) .createQueryBuilder('event') + .innerJoinAndSelect('event.block', 'block') .where('block_hash = :blockHash', { blockHash }) - .addOrderBy('id', 'ASC') + .addOrderBy('event.id', 'ASC') .getMany(); } async getEvents (blockHash: string, contract: string): Promise { return this._conn.getRepository(Event) .createQueryBuilder('event') + .innerJoinAndSelect('event.block', 'block') .where('block_hash = :blockHash AND contract = :contract', { blockHash, contract }) - .addOrderBy('id', 'ASC') + .addOrderBy('event.id', 'ASC') .getMany(); } async getEventsByName (blockHash: string, contract: string, eventName: string): Promise { return this._conn.getRepository(Event) .createQueryBuilder('event') + .innerJoinAndSelect('event.block', 'block') .where('block_hash = :blockHash AND contract = :contract AND event_name = :eventName', { blockHash, contract, eventName }) + .addOrderBy('event.id', 'ASC') .getMany(); } @@ -74,40 +78,64 @@ export class Database { } async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { - return this._conn.getRepository(Event) + const events = await this._conn.getRepository(Event) .createQueryBuilder('event') + .innerJoinAndSelect('event.block', 'block') .where('block_number >= :fromBlockNumber AND block_number <= :toBlockNumber AND event_name <> :eventName', { fromBlockNumber, toBlockNumber, eventName: UNKNOWN_EVENT_NAME }) - .orderBy({ - block_number: 'ASC', - index: 'ASC' - }) + .addOrderBy('event.id', 'ASC') .getMany(); + + return events; } - async saveEvents (blockHash: string, blockNumber: number, events: DeepPartial[]): Promise { + async saveEvents (block: any, events: DeepPartial[]): Promise { + const { + hash: blockHash, + number: blockNumber, + timestamp: blockTimestamp, + parent: { + hash: parentHash + } + } = block; + + assert(blockHash); + assert(blockNumber); + assert(blockTimestamp); + assert(parentHash); + // In a transaction: // (1) Save all the events in the database. // (2) Add an entry to the block progress table. await this._conn.transaction(async (tx) => { const numEvents = events.length; const blockProgressRepo = tx.getRepository(BlockProgress); - const blockProgress = await blockProgressRepo.findOne({ where: { blockHash } }); + let blockProgress = await blockProgressRepo.findOne({ where: { blockHash } }); if (!blockProgress) { - // Bulk insert events. - await tx.createQueryBuilder().insert().into(Event).values(events).execute(); + const entity = blockProgressRepo.create({ + blockHash, + parentHash, + blockNumber, + blockTimestamp, + numEvents, + numProcessedEvents: 0, + isComplete: (numEvents === 0) + }); - const entity = blockProgressRepo.create({ blockHash, blockNumber, numEvents, numProcessedEvents: 0, isComplete: (numEvents === 0) }); - await blockProgressRepo.save(entity); + blockProgress = await blockProgressRepo.save(entity); + + // Bulk insert events. + events.forEach(event => event.block = blockProgress); + await tx.createQueryBuilder().insert().into(Event).values(events).execute(); } }); } async getEvent (id: string): Promise { - return this._conn.getRepository(Event).findOne(id); + return this._conn.getRepository(Event).findOne(id, { relations: [ 'block' ]}); } async saveEventEntity (entity: Event): Promise { diff --git a/packages/uni-watcher/src/entity/BlockProgress.ts b/packages/uni-watcher/src/entity/BlockProgress.ts index b37871e7..ef732154 100644 --- a/packages/uni-watcher/src/entity/BlockProgress.ts +++ b/packages/uni-watcher/src/entity/BlockProgress.ts @@ -2,6 +2,8 @@ import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; @Entity() @Index(['blockHash'], { unique: true }) +@Index(['blockNumber']) +@Index(['parentHash']) export class BlockProgress { @PrimaryGeneratedColumn() id!: number; @@ -9,9 +11,15 @@ export class BlockProgress { @Column('varchar', { length: 66 }) blockHash!: string; + @Column('varchar', { length: 66 }) + parentHash!: string; + @Column('integer') blockNumber!: number; + @Column('integer') + blockTimestamp!: number; + @Column('integer') numEvents!: number; diff --git a/packages/uni-watcher/src/entity/Event.ts b/packages/uni-watcher/src/entity/Event.ts index 40461e0a..02ca4723 100644 --- a/packages/uni-watcher/src/entity/Event.ts +++ b/packages/uni-watcher/src/entity/Event.ts @@ -1,25 +1,17 @@ -import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; +import { Entity, PrimaryGeneratedColumn, Column, ManyToOne, Index } from 'typeorm'; +import { BlockProgress } from './BlockProgress'; export const UNKNOWN_EVENT_NAME = '__unknown__'; @Entity() // Index to query all events for a contract efficiently. -@Index(['blockHash', 'contract']) -// Index to query block range for uniswap events. -@Index(['blockNumber', 'eventName']) +@Index(['contract']) export class Event { @PrimaryGeneratedColumn() id!: number; - // TODO: Denormalizing the block fields is simpler but perhaps not necessary. - @Column('varchar', { length: 66 }) - blockHash!: string; - - @Column('integer') - blockNumber!: number; - - @Column('integer') - blockTimestamp!: number; + @ManyToOne(() => BlockProgress) + block!: BlockProgress; @Column('varchar', { length: 66 }) txHash!: string; diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 420076f8..db69239a 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -63,13 +63,15 @@ export class Indexer { } getResultEvent (event: Event): ResultEvent { + const block = event.block; const eventFields = JSON.parse(event.eventInfo); return { block: { - hash: event.blockHash, - number: event.blockNumber, - timestamp: event.blockTimestamp + hash: block.blockHash, + number: block.blockNumber, + timestamp: block.blockTimestamp, + parentHash: block.parentHash }, tx: { @@ -110,9 +112,6 @@ export class Indexer { throw new Error('Not a uniswap contract'); } - // Fetch block events first. - await this.getOrFetchBlockEvents(blockHash); - const events = await this._db.getEvents(blockHash, contract); log(`getEvents: db hit, num events: ${events.length}`); @@ -132,7 +131,7 @@ export class Indexer { switch (re.event.__typename) { case 'PoolCreatedEvent': { const poolContract = ethers.utils.getAddress(re.event.pool); - await this._db.saveContract(poolContract, KIND_POOL, dbEvent.blockNumber); + await this._db.saveContract(poolContract, KIND_POOL, dbEvent.block.blockNumber); } } } @@ -305,11 +304,7 @@ export class Indexer { address }, transaction: { - hash: txHash, - block: { - number: blockNumber, - timestamp: blockTimestamp - } + hash: txHash } } = logObj; @@ -327,9 +322,6 @@ export class Indexer { } dbEvents.push({ - blockHash, - blockNumber, - blockTimestamp, index: logIndex, txHash, contract, @@ -348,7 +340,8 @@ export class Indexer { }); } - await this._db.saveEvents(blockHash, block.number, dbEvents); + + await this._db.saveEvents(block, dbEvents); } async getEvent (id: string): Promise { diff --git a/packages/uni-watcher/src/job-runner.ts b/packages/uni-watcher/src/job-runner.ts index 9fe33a01..17397e87 100644 --- a/packages/uni-watcher/src/job-runner.ts +++ b/packages/uni-watcher/src/job-runner.ts @@ -61,12 +61,11 @@ export const main = async (): Promise => { await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { const { data: { blockHash, blockNumber } } = job; - log(`Processing block ${blockHash} ${blockNumber}`); + log(`Processing block number ${blockNumber} hash ${blockHash} `); const events = await indexer.getOrFetchBlockEvents(blockHash); for (let ei = 0; ei < events.length; ei++) { - const { blockHash, id } = events[ei]; - await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { blockHash, id, publish: true }); + await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true }); } await jobQueue.markComplete(job); @@ -92,7 +91,8 @@ export const main = async (): Promise => { dbEvent = await indexer.saveEventEntity(dbEvent); } - await indexer.processEvent(dbEvent); + dbEvent = await indexer.getEvent(id); + await indexer.processEvent(dbEvent!); } await jobQueue.markComplete(job); @@ -104,3 +104,7 @@ main().then(() => { }).catch(err => { log(err); }); + +process.on('uncaughtException', err => { + log('uncaughtException', err); +}); diff --git a/packages/uni-watcher/src/resolvers.ts b/packages/uni-watcher/src/resolvers.ts index 2abb06dd..0d6fcb03 100644 --- a/packages/uni-watcher/src/resolvers.ts +++ b/packages/uni-watcher/src/resolvers.ts @@ -52,8 +52,8 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch events: async (_: any, { blockHash, contract, name }: { blockHash: string, contract: string, name: string }) => { log('events', blockHash, contract, name || ''); - const blockProgress = await indexer.getBlockProgress(blockHash); - if (!blockProgress || !blockProgress.isComplete) { + const block = await indexer.getBlockProgress(blockHash); + if (!block || !block.isComplete) { // TODO: Trigger indexing for the block. throw new Error('Not available'); } diff --git a/packages/uni-watcher/src/schema.ts b/packages/uni-watcher/src/schema.ts index 3b10b76e..86dfa560 100644 --- a/packages/uni-watcher/src/schema.ts +++ b/packages/uni-watcher/src/schema.ts @@ -157,6 +157,7 @@ type Block { hash: String! number: Int! timestamp: Int! + parentHash: String! } type Transaction { diff --git a/packages/uni-watcher/src/server.ts b/packages/uni-watcher/src/server.ts index 14727be3..a474e1f7 100644 --- a/packages/uni-watcher/src/server.ts +++ b/packages/uni-watcher/src/server.ts @@ -99,3 +99,7 @@ main().then(() => { }).catch(err => { log(err); }); + +process.on('uncaughtException', err => { + log('uncaughtException', err); +});