Save block parent hash (#155)

* Save block parent hash.

* Store block properties in blocks table.
This commit is contained in:
Ashwin Phatak 2021-07-21 13:00:26 +05:30 committed by GitHub
parent 10e7d37fa8
commit 95486d6553
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 102 additions and 51 deletions

View File

@ -65,8 +65,26 @@ export class EthClient {
async getLogs (vars: Vars): Promise<any> { async getLogs (vars: Vars): Promise<any> {
const result = await this._getCachedOrFetch('getLogs', vars); const result = await this._getCachedOrFetch('getLogs', vars);
const { getLogs: resultLogs, block: { number: blockNumHex, timestamp: timestampHex } } = result; const {
const block = { hash: vars.blockHash, number: parseInt(blockNumHex, 16), timestamp: parseInt(timestampHex, 16) }; getLogs: resultLogs,
block: {
number: blockNumHex,
timestamp: timestampHex,
parent: {
hash: parentHash
}
}
} = result;
const block = {
hash: vars.blockHash,
number: parseInt(blockNumHex, 16),
timestamp: parseInt(timestampHex, 16),
parent: {
hash: parentHash
}
};
const logs = resultLogs.map((logEntry: any) => _.merge({}, logEntry, { transaction: { block } })); const logs = resultLogs.map((logEntry: any) => _.merge({}, logEntry, { transaction: { block } }));
return { logs, block }; return { logs, block };

View File

@ -28,6 +28,9 @@ query getLogs($blockHash: Bytes32!, $contract: Address) {
block(hash: $blockHash) { block(hash: $blockHash) {
number number
timestamp timestamp
parent {
hash
}
} }
} }
`; `;

View File

@ -32,30 +32,34 @@ export class Database {
async getBlockEvents (blockHash: string): Promise<Event[]> { async getBlockEvents (blockHash: string): Promise<Event[]> {
return this._conn.getRepository(Event) return this._conn.getRepository(Event)
.createQueryBuilder('event') .createQueryBuilder('event')
.innerJoinAndSelect('event.block', 'block')
.where('block_hash = :blockHash', { blockHash }) .where('block_hash = :blockHash', { blockHash })
.addOrderBy('id', 'ASC') .addOrderBy('event.id', 'ASC')
.getMany(); .getMany();
} }
async getEvents (blockHash: string, contract: string): Promise<Event[]> { async getEvents (blockHash: string, contract: string): Promise<Event[]> {
return this._conn.getRepository(Event) return this._conn.getRepository(Event)
.createQueryBuilder('event') .createQueryBuilder('event')
.innerJoinAndSelect('event.block', 'block')
.where('block_hash = :blockHash AND contract = :contract', { .where('block_hash = :blockHash AND contract = :contract', {
blockHash, blockHash,
contract contract
}) })
.addOrderBy('id', 'ASC') .addOrderBy('event.id', 'ASC')
.getMany(); .getMany();
} }
async getEventsByName (blockHash: string, contract: string, eventName: string): Promise<Event[] | undefined> { async getEventsByName (blockHash: string, contract: string, eventName: string): Promise<Event[] | undefined> {
return this._conn.getRepository(Event) return this._conn.getRepository(Event)
.createQueryBuilder('event') .createQueryBuilder('event')
.innerJoinAndSelect('event.block', 'block')
.where('block_hash = :blockHash AND contract = :contract AND event_name = :eventName', { .where('block_hash = :blockHash AND contract = :contract AND event_name = :eventName', {
blockHash, blockHash,
contract, contract,
eventName eventName
}) })
.addOrderBy('event.id', 'ASC')
.getMany(); .getMany();
} }
@ -74,40 +78,64 @@ export class Database {
} }
async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<Event>> { async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<Event>> {
return this._conn.getRepository(Event) const events = await this._conn.getRepository(Event)
.createQueryBuilder('event') .createQueryBuilder('event')
.innerJoinAndSelect('event.block', 'block')
.where('block_number >= :fromBlockNumber AND block_number <= :toBlockNumber AND event_name <> :eventName', { .where('block_number >= :fromBlockNumber AND block_number <= :toBlockNumber AND event_name <> :eventName', {
fromBlockNumber, fromBlockNumber,
toBlockNumber, toBlockNumber,
eventName: UNKNOWN_EVENT_NAME eventName: UNKNOWN_EVENT_NAME
}) })
.orderBy({ .addOrderBy('event.id', 'ASC')
block_number: 'ASC',
index: 'ASC'
})
.getMany(); .getMany();
return events;
} }
async saveEvents (blockHash: string, blockNumber: number, events: DeepPartial<Event>[]): Promise<void> { async saveEvents (block: any, events: DeepPartial<Event>[]): Promise<void> {
const {
hash: blockHash,
number: blockNumber,
timestamp: blockTimestamp,
parent: {
hash: parentHash
}
} = block;
assert(blockHash);
assert(blockNumber);
assert(blockTimestamp);
assert(parentHash);
// In a transaction: // In a transaction:
// (1) Save all the events in the database. // (1) Save all the events in the database.
// (2) Add an entry to the block progress table. // (2) Add an entry to the block progress table.
await this._conn.transaction(async (tx) => { await this._conn.transaction(async (tx) => {
const numEvents = events.length; const numEvents = events.length;
const blockProgressRepo = tx.getRepository(BlockProgress); const blockProgressRepo = tx.getRepository(BlockProgress);
const blockProgress = await blockProgressRepo.findOne({ where: { blockHash } }); let blockProgress = await blockProgressRepo.findOne({ where: { blockHash } });
if (!blockProgress) { if (!blockProgress) {
// Bulk insert events. const entity = blockProgressRepo.create({
await tx.createQueryBuilder().insert().into(Event).values(events).execute(); blockHash,
parentHash,
blockNumber,
blockTimestamp,
numEvents,
numProcessedEvents: 0,
isComplete: (numEvents === 0)
});
const entity = blockProgressRepo.create({ blockHash, blockNumber, numEvents, numProcessedEvents: 0, isComplete: (numEvents === 0) }); blockProgress = await blockProgressRepo.save(entity);
await blockProgressRepo.save(entity);
// Bulk insert events.
events.forEach(event => event.block = blockProgress);
await tx.createQueryBuilder().insert().into(Event).values(events).execute();
} }
}); });
} }
async getEvent (id: string): Promise<Event | undefined> { async getEvent (id: string): Promise<Event | undefined> {
return this._conn.getRepository(Event).findOne(id); return this._conn.getRepository(Event).findOne(id, { relations: [ 'block' ]});
} }
async saveEventEntity (entity: Event): Promise<Event> { async saveEventEntity (entity: Event): Promise<Event> {

View File

@ -2,6 +2,8 @@ import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
@Entity() @Entity()
@Index(['blockHash'], { unique: true }) @Index(['blockHash'], { unique: true })
@Index(['blockNumber'])
@Index(['parentHash'])
export class BlockProgress { export class BlockProgress {
@PrimaryGeneratedColumn() @PrimaryGeneratedColumn()
id!: number; id!: number;
@ -9,9 +11,15 @@ export class BlockProgress {
@Column('varchar', { length: 66 }) @Column('varchar', { length: 66 })
blockHash!: string; blockHash!: string;
@Column('varchar', { length: 66 })
parentHash!: string;
@Column('integer') @Column('integer')
blockNumber!: number; blockNumber!: number;
@Column('integer')
blockTimestamp!: number;
@Column('integer') @Column('integer')
numEvents!: number; numEvents!: number;

View File

@ -1,25 +1,17 @@
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm'; import { Entity, PrimaryGeneratedColumn, Column, ManyToOne, Index } from 'typeorm';
import { BlockProgress } from './BlockProgress';
export const UNKNOWN_EVENT_NAME = '__unknown__'; export const UNKNOWN_EVENT_NAME = '__unknown__';
@Entity() @Entity()
// Index to query all events for a contract efficiently. // Index to query all events for a contract efficiently.
@Index(['blockHash', 'contract']) @Index(['contract'])
// Index to query block range for uniswap events.
@Index(['blockNumber', 'eventName'])
export class Event { export class Event {
@PrimaryGeneratedColumn() @PrimaryGeneratedColumn()
id!: number; id!: number;
// TODO: Denormalizing the block fields is simpler but perhaps not necessary. @ManyToOne(() => BlockProgress)
@Column('varchar', { length: 66 }) block!: BlockProgress;
blockHash!: string;
@Column('integer')
blockNumber!: number;
@Column('integer')
blockTimestamp!: number;
@Column('varchar', { length: 66 }) @Column('varchar', { length: 66 })
txHash!: string; txHash!: string;

View File

@ -63,13 +63,15 @@ export class Indexer {
} }
getResultEvent (event: Event): ResultEvent { getResultEvent (event: Event): ResultEvent {
const block = event.block;
const eventFields = JSON.parse(event.eventInfo); const eventFields = JSON.parse(event.eventInfo);
return { return {
block: { block: {
hash: event.blockHash, hash: block.blockHash,
number: event.blockNumber, number: block.blockNumber,
timestamp: event.blockTimestamp timestamp: block.blockTimestamp,
parentHash: block.parentHash
}, },
tx: { tx: {
@ -110,9 +112,6 @@ export class Indexer {
throw new Error('Not a uniswap contract'); throw new Error('Not a uniswap contract');
} }
// Fetch block events first.
await this.getOrFetchBlockEvents(blockHash);
const events = await this._db.getEvents(blockHash, contract); const events = await this._db.getEvents(blockHash, contract);
log(`getEvents: db hit, num events: ${events.length}`); log(`getEvents: db hit, num events: ${events.length}`);
@ -132,7 +131,7 @@ export class Indexer {
switch (re.event.__typename) { switch (re.event.__typename) {
case 'PoolCreatedEvent': { case 'PoolCreatedEvent': {
const poolContract = ethers.utils.getAddress(re.event.pool); const poolContract = ethers.utils.getAddress(re.event.pool);
await this._db.saveContract(poolContract, KIND_POOL, dbEvent.blockNumber); await this._db.saveContract(poolContract, KIND_POOL, dbEvent.block.blockNumber);
} }
} }
} }
@ -305,11 +304,7 @@ export class Indexer {
address address
}, },
transaction: { transaction: {
hash: txHash, hash: txHash
block: {
number: blockNumber,
timestamp: blockTimestamp
}
} }
} = logObj; } = logObj;
@ -327,9 +322,6 @@ export class Indexer {
} }
dbEvents.push({ dbEvents.push({
blockHash,
blockNumber,
blockTimestamp,
index: logIndex, index: logIndex,
txHash, txHash,
contract, contract,
@ -348,7 +340,8 @@ export class Indexer {
}); });
} }
await this._db.saveEvents(blockHash, block.number, dbEvents);
await this._db.saveEvents(block, dbEvents);
} }
async getEvent (id: string): Promise<Event | undefined> { async getEvent (id: string): Promise<Event | undefined> {

View File

@ -61,12 +61,11 @@ export const main = async (): Promise<any> => {
await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => { await jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
const { data: { blockHash, blockNumber } } = job; const { data: { blockHash, blockNumber } } = job;
log(`Processing block ${blockHash} ${blockNumber}`); log(`Processing block number ${blockNumber} hash ${blockHash} `);
const events = await indexer.getOrFetchBlockEvents(blockHash); const events = await indexer.getOrFetchBlockEvents(blockHash);
for (let ei = 0; ei < events.length; ei++) { for (let ei = 0; ei < events.length; ei++) {
const { blockHash, id } = events[ei]; await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { id: events[ei].id, publish: true });
await jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { blockHash, id, publish: true });
} }
await jobQueue.markComplete(job); await jobQueue.markComplete(job);
@ -92,7 +91,8 @@ export const main = async (): Promise<any> => {
dbEvent = await indexer.saveEventEntity(dbEvent); dbEvent = await indexer.saveEventEntity(dbEvent);
} }
await indexer.processEvent(dbEvent); dbEvent = await indexer.getEvent(id);
await indexer.processEvent(dbEvent!);
} }
await jobQueue.markComplete(job); await jobQueue.markComplete(job);
@ -104,3 +104,7 @@ main().then(() => {
}).catch(err => { }).catch(err => {
log(err); log(err);
}); });
process.on('uncaughtException', err => {
log('uncaughtException', err);
});

View File

@ -52,8 +52,8 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
events: async (_: any, { blockHash, contract, name }: { blockHash: string, contract: string, name: string }) => { events: async (_: any, { blockHash, contract, name }: { blockHash: string, contract: string, name: string }) => {
log('events', blockHash, contract, name || ''); log('events', blockHash, contract, name || '');
const blockProgress = await indexer.getBlockProgress(blockHash); const block = await indexer.getBlockProgress(blockHash);
if (!blockProgress || !blockProgress.isComplete) { if (!block || !block.isComplete) {
// TODO: Trigger indexing for the block. // TODO: Trigger indexing for the block.
throw new Error('Not available'); throw new Error('Not available');
} }

View File

@ -157,6 +157,7 @@ type Block {
hash: String! hash: String!
number: Int! number: Int!
timestamp: Int! timestamp: Int!
parentHash: String!
} }
type Transaction { type Transaction {

View File

@ -99,3 +99,7 @@ main().then(() => {
}).catch(err => { }).catch(err => {
log(err); log(err);
}); });
process.on('uncaughtException', err => {
log('uncaughtException', err);
});