Event processing/sync changes to maintain processing order. (#131)

This commit is contained in:
Ashwin Phatak 2021-07-12 17:06:33 +05:30 committed by GitHub
parent 69c68b365f
commit d71557e963
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 116 additions and 42 deletions

View File

@ -12,7 +12,7 @@ interface Config extends GraphQLConfig {
interface Vars {
blockHash: string;
contract: string;
contract?: string;
slot?: string;
}

View File

@ -11,11 +11,14 @@ query getStorageAt($blockHash: Bytes32!, $contract: Address!, $slot: Bytes32!) {
`;
export const getLogs = gql`
query getLogs($blockHash: Bytes32!, $contract: Address!) {
query getLogs($blockHash: Bytes32!, $contract: Address) {
getLogs(blockHash: $blockHash, contract: $contract) {
account {
address
}
transaction {
hash
}
topics
data
cid

View File

@ -29,18 +29,23 @@ export class Database {
}
// Returns true if events have already been synced for the (block, token) combination.
async didSyncEvents ({ blockHash, contract }: { blockHash: string, contract: string }): Promise<boolean> {
async didSyncEvents ({ blockHash }: { blockHash: string }): Promise<boolean> {
const numRows = await this._conn.getRepository(EventSyncProgress)
.createQueryBuilder()
.where('block_hash = :blockHash AND contract = :contract', {
blockHash,
contract
})
.where('block_hash = :blockHash', { blockHash })
.getCount();
return numRows > 0;
}
async getBlockEvents ({ blockHash }: { blockHash: string }): Promise<Event[]> {
return this._conn.getRepository(Event)
.createQueryBuilder('event')
.where('block_hash = :blockHash', { blockHash })
.addOrderBy('id', 'ASC')
.getMany();
}
async getEvents ({ blockHash, contract }: { blockHash: string, contract: string }): Promise<Event[]> {
return this._conn.getRepository(Event)
.createQueryBuilder('event')
@ -63,7 +68,7 @@ export class Database {
.getMany();
}
async saveEvents ({ blockHash, contract, events }: { blockHash: string, contract: string, events: DeepPartial<Event>[] }): Promise<void> {
async saveEvents ({ blockHash, events }: { blockHash: string, events: DeepPartial<Event>[] }): Promise<void> {
// In a transaction:
// (1) Save all the events in the database.
// (2) Add an entry to the event progress table.
@ -74,10 +79,7 @@ export class Database {
// Check sync progress inside the transaction.
const numRows = await repo
.createQueryBuilder()
.where('block_hash = :blockHash AND contract = :contract', {
blockHash,
contract
})
.where('block_hash = :blockHash', { blockHash })
.getCount();
if (numRows === 0) {
@ -89,7 +91,7 @@ export class Database {
.execute();
// Update event sync progress.
const progress = repo.create({ blockHash, contract });
const progress = repo.create({ blockHash });
await repo.save(progress);
}
});

View File

@ -10,6 +10,9 @@ export class Event {
@Column('varchar', { length: 66 })
blockHash!: string;
@Column('varchar', { length: 66 })
txHash!: string;
@Column('varchar', { length: 42 })
contract!: string;

View File

@ -7,14 +7,11 @@ import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
// yet been synced from upstream.
//
@Entity()
@Index(['blockHash', 'contract'], { unique: true })
@Index(['blockHash'], { unique: true })
export class EventSyncProgress {
@PrimaryGeneratedColumn()
id!: number;
@Column('varchar', { length: 66 })
blockHash!: string;
@Column('varchar', { length: 42 })
contract!: string;
}

View File

@ -30,21 +30,43 @@ export class EventWatcher {
const receipt = _.get(value, 'data.listen.relatedNode');
log('watchLogs', JSON.stringify(receipt, null, 2));
// Check if this log is for a contract we care about.
const blocks = [];
const { logContracts } = receipt;
if (logContracts && logContracts.length) {
for (let logIndex = 0; logIndex < logContracts.length; logIndex++) {
const contractAddress = logContracts[logIndex];
const uniContract = await this._indexer.isUniswapContract(contractAddress);
if (uniContract) {
const { ethTransactionCidByTxId: { txHash, ethHeaderCidByHeaderId: { blockHash, blockNumber } } } = receipt;
const events = await this._indexer.getEvents(blockHash, contractAddress, null);
const event = events[logIndex];
const { ethTransactionCidByTxId: { ethHeaderCidByHeaderId: { blockHash, blockNumber } } } = receipt;
await this._indexer.getBlockEvents(blockHash);
blocks.push({ blockHash, blockNumber });
}
}
const processedBlocks: any = {};
if (!blocks.length) {
return;
}
// Process events, if from known uniswap contracts.
for (let bi = 0; bi < blocks.length; bi++) {
const { blockHash, blockNumber } = blocks[bi];
if (processedBlocks[blockHash]) {
continue;
}
const events = await this._indexer.getBlockEvents(blockHash);
for (let ei = 0; ei < events.length; ei++) {
const eventObj = events[ei];
const uniContract = await this._indexer.isUniswapContract(eventObj.extra.contract);
if (uniContract) {
log('event', JSON.stringify(eventObj, null, 2));
// TODO: Move processing to background queue (need sequential processing of events).
// Trigger other indexer methods based on event topic.
await this._indexer.processEvent(blockHash, blockNumber, uniContract, txHash, receipt, event);
await this._indexer.processEvent(blockHash, blockNumber, uniContract, eventObj.extra.txHash, eventObj);
}
}
processedBlocks[blockHash] = true;
}
});
}

View File

@ -22,6 +22,7 @@ const log = debug('vulcanize:indexer');
type EventResult = {
event: any;
proof: string;
extra: any;
};
type EventsResult = Array<EventResult>;
@ -56,25 +57,62 @@ export class Indexer {
return this._pubsub.asyncIterator(['event']);
}
async getBlockEvents (blockHash: string): Promise<EventsResult> {
const didSyncEvents = await this._db.didSyncEvents({ blockHash });
if (!didSyncEvents) {
// Fetch and save events first and make a note in the event sync progress table.
await this._fetchAndSaveEvents({ blockHash });
log('getEvents: db miss, fetching from upstream server');
}
assert(await this._db.didSyncEvents({ blockHash }));
const events = await this._db.getBlockEvents({ blockHash });
log(`getEvents: db hit, num events: ${events.length}`);
const result = events
.map(e => {
const eventFields = JSON.parse(e.eventData);
return {
event: {
__typename: `${e.eventName}Event`,
...eventFields
},
// TODO: Return proof only if requested.
proof: JSON.parse(e.proof),
extra: {
contract: e.contract,
txHash: e.txHash
}
};
});
// log(JSONbig.stringify(result, null, 2));
return result;
}
async getEvents (blockHash: string, contract: string, name: string | null): Promise<EventsResult> {
const uniContract = await this.isUniswapContract(contract);
if (!uniContract) {
throw new Error('Not a uniswap contract');
}
const didSyncEvents = await this._db.didSyncEvents({ blockHash, contract });
const didSyncEvents = await this._db.didSyncEvents({ blockHash });
if (!didSyncEvents) {
// Fetch and save events first and make a note in the event sync progress table.
await this._fetchAndSaveEvents({ blockHash, contract, uniContract });
await this._fetchAndSaveEvents({ blockHash });
log('getEvents: db miss, fetching from upstream server');
}
assert(await this._db.didSyncEvents({ blockHash, contract }));
assert(await this._db.didSyncEvents({ blockHash }));
const events = await this._db.getEvents({ blockHash, contract });
log(`getEvents: db hit, num events: ${events.length}`);
const result = events
.filter(event => contract === event.contract)
// TODO: Filter using db WHERE condition when name is not empty.
.filter(event => !name || name === event.eventName)
.map(e => {
@ -86,7 +124,11 @@ export class Indexer {
...eventFields
},
// TODO: Return proof only if requested.
proof: JSON.parse(e.proof)
proof: JSON.parse(e.proof),
extra: {
contract: e.contract,
txHash: e.txHash
}
};
});
@ -123,7 +165,7 @@ export class Indexer {
return this._db.getContract(ethers.utils.getAddress(address));
}
async processEvent (blockHash: string, blockNumber: number, contract: Contract, txHash: string, receipt: any, event: EventResult): Promise<void> {
async processEvent (blockHash: string, blockNumber: number, contract: Contract, txHash: string, event: EventResult): Promise<void> {
// Trigger indexing of data based on the event.
await this.triggerIndexingOnEvent(blockNumber, event);
@ -131,17 +173,24 @@ export class Indexer {
await this.publishEventToSubscribers(blockHash, blockNumber, contract.address, txHash, event);
}
async _fetchAndSaveEvents ({ blockHash, contract, uniContract }: { blockHash: string, contract: string, uniContract: Contract }): Promise<void> {
assert(uniContract);
async _fetchAndSaveEvents ({ blockHash }: { blockHash: string }): Promise<void> {
const logs = await this._ethClient.getLogs({ blockHash });
const logs = await this._ethClient.getLogs({ blockHash, contract });
const dbEvents: Array<DeepPartial<Event>> = [];
const dbEvents = logs.map((logObj: any) => {
const { topics, data, cid, ipldBlock } = logObj;
for (let logIndex = 0; logIndex < logs.length; logIndex++) {
const logObj = logs[logIndex];
const { topics, data, cid, ipldBlock, account: { address }, transaction: { hash: txHash } } = logObj;
let eventName;
let eventProps = {};
const contract = ethers.utils.getAddress(address);
const uniContract = await this.isUniswapContract(contract);
if (!uniContract) {
continue;
}
switch (uniContract.kind) {
case KIND_FACTORY: {
const logDescription = this._factoryContract.parseLog({ data, topics });
@ -217,10 +266,10 @@ export class Indexer {
}
}
let event: DeepPartial<Event> | undefined;
if (eventName) {
event = {
dbEvents.push({
blockHash,
txHash,
contract,
eventName,
eventData: JSONbig.stringify({ ...eventProps }),
@ -233,13 +282,11 @@ export class Indexer {
}
})
})
};
});
}
return event;
});
}
const events: DeepPartial<Event>[] = _.compact(dbEvents);
await this._db.saveEvents({ blockHash, contract, events });
await this._db.saveEvents({ blockHash, events });
}
}