Extra info for handling events (tx, block) (#136)

* Extra info in events for downstream processing.

* Changes in uni-info-watcher after change in uni-watcher event schema.

Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
Ashwin Phatak 2021-07-13 17:32:57 +05:30 committed by GitHub
parent 208b0f7f4f
commit a4f5d43bc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 283 additions and 223 deletions

View File

@ -1,4 +1,5 @@
import assert from 'assert'; import assert from 'assert';
import _ from 'lodash';
import { Cache } from '@vulcanize/cache'; import { Cache } from '@vulcanize/cache';
@ -64,9 +65,21 @@ export class EthClient {
async getLogs (vars: Vars): Promise<any> { async getLogs (vars: Vars): Promise<any> {
const result = await this._getCachedOrFetch('getLogs', vars); const result = await this._getCachedOrFetch('getLogs', vars);
const { getLogs: logs } = result; const { getLogs: logs, block: { number: blockNumHex, timestamp: timestampHex } } = result;
const blockNumber = parseInt(blockNumHex, 16);
const timestamp = parseInt(timestampHex, 16);
return logs; return logs.map((logEntry: any) => {
return _.merge({}, logEntry, {
transaction: {
block: {
hash: vars.blockHash,
number: blockNumber,
timestamp
}
}
});
});
} }
async watchLogs (onNext: (value: any) => void): Promise<ZenObservable.Subscription> { async watchLogs (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {

View File

@ -21,9 +21,14 @@ query getLogs($blockHash: Bytes32!, $contract: Address) {
} }
topics topics
data data
index
cid cid
ipldBlock ipldBlock
} }
block(hash: $blockHash) {
number
timestamp
}
} }
`; `;

View File

@ -14,6 +14,7 @@ import { loadTick } from './utils/tick';
const log = debug('vulcanize:events'); const log = debug('vulcanize:events');
interface PoolCreatedEvent { interface PoolCreatedEvent {
__typename: 'PoolCreatedEvent';
token0: string; token0: string;
token1: string; token1: string;
fee: bigint; fee: bigint;
@ -22,11 +23,13 @@ interface PoolCreatedEvent {
} }
interface InitializeEvent { interface InitializeEvent {
__typename: 'InitializeEvent';
sqrtPriceX96: bigint; sqrtPriceX96: bigint;
tick: bigint; tick: bigint;
} }
interface MintEvent { interface MintEvent {
__typename: 'MintEvent';
sender: string; sender: string;
owner: string; owner: string;
tickLower: bigint; tickLower: bigint;
@ -37,6 +40,7 @@ interface MintEvent {
} }
interface BurnEvent { interface BurnEvent {
__typename: 'BurnEvent';
owner: string; owner: string;
tickLower: bigint; tickLower: bigint;
tickUpper: bigint; tickUpper: bigint;
@ -45,13 +49,24 @@ interface BurnEvent {
amount1: bigint; amount1: bigint;
} }
interface Block {
number: number;
hash: string;
timestamp: number;
}
interface Transaction {
hash: string;
from: string;
}
interface ResultEvent { interface ResultEvent {
block: Block;
tx: Transaction;
contract: string;
event: PoolCreatedEvent | InitializeEvent | MintEvent | BurnEvent;
proof: { proof: {
data: string data: string;
}
event: {
__typename: string;
[key: string]: any;
} }
} }
@ -82,29 +97,29 @@ export class EventWatcher {
} }
} }
async _handleEvents ({ blockHash, blockNumber, contract, txHash, event }: { blockHash: string, blockNumber: number, contract: string, txHash: string, event: ResultEvent}): Promise<void> { async _handleEvents ({ block, tx, contract, event }: ResultEvent): Promise<void> {
// TODO: Process proof (proof.data) in event. // TODO: Process proof (proof.data) in event.
const { event: { __typename: eventType, ...eventValues } } = event; const { __typename: eventType } = event;
switch (eventType) { switch (eventType) {
case 'PoolCreatedEvent': case 'PoolCreatedEvent':
log('Factory PoolCreated event', contract); log('Factory PoolCreated event', contract);
this._handlePoolCreated(blockHash, blockNumber, contract, txHash, eventValues as PoolCreatedEvent); this._handlePoolCreated(block, contract, tx, event as PoolCreatedEvent);
break; break;
case 'InitializeEvent': case 'InitializeEvent':
log('Pool Initialize event', contract); log('Pool Initialize event', contract);
this._handleInitialize(blockHash, blockNumber, contract, txHash, eventValues as InitializeEvent); this._handleInitialize(block, contract, tx, event as InitializeEvent);
break; break;
case 'MintEvent': case 'MintEvent':
log('Pool Mint event', contract); log('Pool Mint event', contract);
this._handleMint(blockHash, blockNumber, contract, txHash, eventValues as MintEvent); this._handleMint(block, contract, tx, event as MintEvent);
break; break;
case 'BurnEvent': case 'BurnEvent':
log('Pool Burn event', contract); log('Pool Burn event', contract);
this._handleBurn(blockHash, blockNumber, contract, txHash, eventValues as BurnEvent); this._handleBurn(block, contract, tx, event as BurnEvent);
break; break;
default: default:
@ -112,7 +127,8 @@ export class EventWatcher {
} }
} }
async _handlePoolCreated (blockHash: string, blockNumber: number, contractAddress: string, txHash: string, poolCreatedEvent: PoolCreatedEvent): Promise<void> { async _handlePoolCreated (block: Block, contractAddress: string, tx: Transaction, poolCreatedEvent: PoolCreatedEvent): Promise<void> {
const { number: blockNumber, hash: blockHash } = block;
const { token0: token0Address, token1: token1Address, fee, pool: poolAddress } = poolCreatedEvent; const { token0: token0Address, token1: token1Address, fee, pool: poolAddress } = poolCreatedEvent;
// Load factory. // Load factory.
@ -184,7 +200,8 @@ export class EventWatcher {
}); });
} }
async _handleInitialize (blockHash: string, blockNumber: number, contractAddress: string, txHash: string, initializeEvent: InitializeEvent): Promise<void> { async _handleInitialize (block: Block, contractAddress: string, tx: Transaction, initializeEvent: InitializeEvent): Promise<void> {
const { number: blockNumber, timestamp: blockTimestamp } = block;
const { sqrtPriceX96, tick } = initializeEvent; const { sqrtPriceX96, tick } = initializeEvent;
const pool = await this._db.getPool({ id: contractAddress, blockNumber }); const pool = await this._db.getPool({ id: contractAddress, blockNumber });
assert(pool, `Pool ${contractAddress} not found.`); assert(pool, `Pool ${contractAddress} not found.`);
@ -199,8 +216,8 @@ export class EventWatcher {
bundle.ethPriceUSD = await getEthPriceInUSD(this._db); bundle.ethPriceUSD = await getEthPriceInUSD(this._db);
this._db.saveBundle(bundle, blockNumber); this._db.saveBundle(bundle, blockNumber);
await updatePoolDayData(this._db, { contractAddress, blockNumber }); await updatePoolDayData(this._db, { contractAddress, blockNumber, blockTimestamp });
await updatePoolHourData(this._db, { contractAddress, blockNumber }); await updatePoolHourData(this._db, { contractAddress, blockNumber, blockTimestamp });
const [token0, token1] = await Promise.all([ const [token0, token1] = await Promise.all([
this._db.getToken({ id: pool.token0.id, blockNumber }), this._db.getToken({ id: pool.token0.id, blockNumber }),
@ -219,7 +236,9 @@ export class EventWatcher {
]); ]);
} }
async _handleMint (blockHash: string, blockNumber: number, contractAddress: string, txHash: string, mintEvent: MintEvent): Promise<void> { async _handleMint (block: Block, contractAddress: string, tx: Transaction, mintEvent: MintEvent): Promise<void> {
const { number: blockNumber, timestamp: blockTimestamp } = block;
const { hash: txHash } = tx;
const bundle = await this._db.loadBundle({ id: '1', blockNumber }); const bundle = await this._db.loadBundle({ id: '1', blockNumber });
const poolAddress = contractAddress; const poolAddress = contractAddress;
const pool = await this._db.loadPool({ id: poolAddress, blockNumber }); const pool = await this._db.loadPool({ id: poolAddress, blockNumber });
@ -279,7 +298,7 @@ export class EventWatcher {
factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH); factory.totalValueLockedETH = factory.totalValueLockedETH.plus(pool.totalValueLockedETH);
factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD);
const transaction = await loadTransaction(this._db, { txHash, blockNumber }); const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp });
await this._db.loadMint({ await this._db.loadMint({
id: transaction.id + '#' + pool.txCount.toString(), id: transaction.id + '#' + pool.txCount.toString(),
@ -322,13 +341,13 @@ export class EventWatcher {
// TODO: Update Tick's volume, fees, and liquidity provider count. // TODO: Update Tick's volume, fees, and liquidity provider count.
// Computing these on the tick level requires reimplementing some of the swapping code from v3-core. // Computing these on the tick level requires reimplementing some of the swapping code from v3-core.
await updateUniswapDayData(this._db, { blockNumber, contractAddress }); await updateUniswapDayData(this._db, { blockNumber, contractAddress, blockTimestamp });
await updatePoolDayData(this._db, { blockNumber, contractAddress }); await updatePoolDayData(this._db, { blockNumber, contractAddress, blockTimestamp });
await updatePoolHourData(this._db, { blockNumber, contractAddress }); await updatePoolHourData(this._db, { blockNumber, contractAddress, blockTimestamp });
await updateTokenDayData(this._db, token0, { blockNumber }); await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp });
await updateTokenDayData(this._db, token1, { blockNumber }); await updateTokenDayData(this._db, token1, { blockNumber, blockTimestamp });
await updateTokenHourData(this._db, token0, { blockNumber }); await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp });
await updateTokenHourData(this._db, token1, { blockNumber }); await updateTokenHourData(this._db, token1, { blockNumber, blockTimestamp });
await Promise.all([ await Promise.all([
this._db.saveToken(token0, blockNumber), this._db.saveToken(token0, blockNumber),
@ -346,7 +365,9 @@ export class EventWatcher {
// Skipping update inner tick vars and tick day data as they are not queried. // Skipping update inner tick vars and tick day data as they are not queried.
} }
async _handleBurn (blockHash: string, blockNumber: number, contractAddress: string, txHash: string, burnEvent: BurnEvent): Promise<void> { async _handleBurn (block: Block, contractAddress: string, tx: Transaction, burnEvent: BurnEvent): Promise<void> {
const { number: blockNumber, timestamp: blockTimestamp } = block;
const { hash: txHash } = tx;
const bundle = await this._db.loadBundle({ id: '1', blockNumber }); const bundle = await this._db.loadBundle({ id: '1', blockNumber });
const poolAddress = contractAddress; const poolAddress = contractAddress;
const pool = await this._db.loadPool({ id: poolAddress, blockNumber }); const pool = await this._db.loadPool({ id: poolAddress, blockNumber });
@ -407,7 +428,7 @@ export class EventWatcher {
factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD); factory.totalValueLockedUSD = factory.totalValueLockedETH.times(bundle.ethPriceUSD);
// Burn entity. // Burn entity.
const transaction = await loadTransaction(this._db, { txHash, blockNumber }); const transaction = await loadTransaction(this._db, { txHash, blockNumber, blockTimestamp });
await this._db.loadBurn({ await this._db.loadBurn({
id: transaction.id + '#' + pool.txCount.toString(), id: transaction.id + '#' + pool.txCount.toString(),
@ -441,13 +462,13 @@ export class EventWatcher {
upperTick.liquidityGross = upperTick.liquidityGross - amount; upperTick.liquidityGross = upperTick.liquidityGross - amount;
upperTick.liquidityNet = upperTick.liquidityNet + amount; upperTick.liquidityNet = upperTick.liquidityNet + amount;
await updateUniswapDayData(this._db, { blockNumber, contractAddress }); await updateUniswapDayData(this._db, { blockNumber, contractAddress, blockTimestamp });
await updatePoolDayData(this._db, { blockNumber, contractAddress }); await updatePoolDayData(this._db, { blockNumber, contractAddress, blockTimestamp });
await updatePoolHourData(this._db, { blockNumber, contractAddress }); await updatePoolHourData(this._db, { blockNumber, contractAddress, blockTimestamp });
await updateTokenDayData(this._db, token0, { blockNumber }); await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp });
await updateTokenDayData(this._db, token0, { blockNumber }); await updateTokenDayData(this._db, token0, { blockNumber, blockTimestamp });
await updateTokenHourData(this._db, token0, { blockNumber }); await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp });
await updateTokenHourData(this._db, token0, { blockNumber }); await updateTokenHourData(this._db, token0, { blockNumber, blockTimestamp });
// Skipping update Tick fee and Tick day data as they are not queried. // Skipping update Tick fee and Tick day data as they are not queried.

View File

@ -22,21 +22,17 @@ export const convertTokenToDecimal = (tokenAmount: bigint, exchangeDecimals: big
return (new Decimal(tokenAmount.toString())).div(exponentToBigDecimal(exchangeDecimals)); return (new Decimal(tokenAmount.toString())).div(exponentToBigDecimal(exchangeDecimals));
}; };
export const loadTransaction = async (db: Database, event: { txHash: string, blockNumber: number }): Promise<Transaction> => { export const loadTransaction = async (db: Database, event: { txHash: string, blockNumber: number, blockTimestamp: number }): Promise<Transaction> => {
const { txHash, blockNumber } = event; const { txHash, blockNumber, blockTimestamp } = event;
// TODO: Get block timestamp from event.
// transaction.timestamp = event.block.timestamp
const timestamp = BigInt(Math.floor(Date.now() / 1000)); // Unix timestamp.
const transaction = await db.loadTransaction({ const transaction = await db.loadTransaction({
id: txHash, id: txHash,
blockNumber, blockNumber,
timestamp timestamp: BigInt(blockTimestamp)
}); });
transaction.blockNumber = blockNumber; transaction.blockNumber = blockNumber;
transaction.timestamp = timestamp; transaction.timestamp = BigInt(blockTimestamp);
return db.saveTransaction(transaction, blockNumber); return db.saveTransaction(transaction, blockNumber);
}; };

View File

@ -13,17 +13,14 @@ import { UniswapDayData } from '../entity/UniswapDayData';
* @param db * @param db
* @param event * @param event
*/ */
export const updateUniswapDayData = async (db: Database, event: { contractAddress: string, blockNumber: number }): Promise<UniswapDayData> => { export const updateUniswapDayData = async (db: Database, event: { contractAddress: string, blockNumber: number, blockTimestamp: number }): Promise<UniswapDayData> => {
const { blockNumber } = event; const { blockNumber, blockTimestamp } = event;
// TODO: In subgraph factory is fetched by hardcoded factory address. // TODO: In subgraph factory is fetched by hardcoded factory address.
// Currently fetching first factory in database as only one exists. // Currently fetching first factory in database as only one exists.
const [factory] = await db.getFactories({ blockNumber }, { limit: 1 }); const [factory] = await db.getFactories({ blockNumber }, { limit: 1 });
// TODO: Get block timestamp from event. const dayID = Math.floor(blockTimestamp / 86400); // Rounded.
// let timestamp = event.block.timestamp.toI32()
const timestamp = Math.floor(Date.now() / 1000); // Unix timestamp.
const dayID = Math.floor(timestamp / 86400); // Rounded.
const dayStartTimestamp = dayID * 86400; const dayStartTimestamp = dayID * 86400;
const uniswapDayData = await db.loadUniswapDayData({ const uniswapDayData = await db.loadUniswapDayData({
@ -39,14 +36,9 @@ export const updateUniswapDayData = async (db: Database, event: { contractAddres
return db.saveUniswapDayData(uniswapDayData, blockNumber); return db.saveUniswapDayData(uniswapDayData, blockNumber);
}; };
export const updatePoolDayData = async (db: Database, event: { contractAddress: string, blockNumber: number }): Promise<PoolDayData> => { export const updatePoolDayData = async (db: Database, event: { contractAddress: string, blockNumber: number, blockTimestamp: number }): Promise<PoolDayData> => {
const { contractAddress, blockNumber } = event; const { contractAddress, blockNumber, blockTimestamp } = event;
const dayID = Math.floor(blockTimestamp / 86400);
// TODO: Get block timestamp from event.
// let timestamp = event.block.timestamp.toI32()
const timestamp = Math.floor(Date.now() / 1000); // Unix timestamp.
const dayID = Math.floor(timestamp / 86400);
const dayStartTimestamp = dayID * 86400; const dayStartTimestamp = dayID * 86400;
const dayPoolID = contractAddress const dayPoolID = contractAddress
@ -88,14 +80,9 @@ export const updatePoolDayData = async (db: Database, event: { contractAddress:
return poolDayData; return poolDayData;
}; };
export const updatePoolHourData = async (db: Database, event: { contractAddress: string, blockNumber: number }): Promise<PoolHourData> => { export const updatePoolHourData = async (db: Database, event: { contractAddress: string, blockNumber: number, blockTimestamp: number }): Promise<PoolHourData> => {
const { contractAddress, blockNumber } = event; const { contractAddress, blockNumber, blockTimestamp } = event;
const hourIndex = Math.floor(blockTimestamp / 3600); // Get unique hour within unix history.
// TODO: Get block timestamp from event.
// let timestamp = event.block.timestamp.toI32()
const timestamp = Math.floor(Date.now() / 1000); // Unix timestamp.
const hourIndex = Math.floor(timestamp / 3600); // Get unique hour within unix history.
const hourStartUnix = hourIndex * 3600; // Want the rounded effect. const hourStartUnix = hourIndex * 3600; // Want the rounded effect.
const hourPoolID = contractAddress const hourPoolID = contractAddress
@ -137,15 +124,10 @@ export const updatePoolHourData = async (db: Database, event: { contractAddress:
return poolHourData; return poolHourData;
}; };
export const updateTokenDayData = async (db: Database, token: Token, event: { blockNumber: number }): Promise<TokenDayData> => { export const updateTokenDayData = async (db: Database, token: Token, event: { blockNumber: number, blockTimestamp: number }): Promise<TokenDayData> => {
const { blockNumber } = event; const { blockNumber, blockTimestamp } = event;
const bundle = await db.loadBundle({ id: '1', blockNumber }); const bundle = await db.loadBundle({ id: '1', blockNumber });
const dayID = Math.floor(blockTimestamp / 86400);
// TODO: Get block timestamp from event.
// let timestamp = event.block.timestamp.toI32()
const timestamp = Math.floor(Date.now() / 1000); // Unix timestamp.
const dayID = Math.floor(timestamp / 86400);
const dayStartTimestamp = dayID * 86400; const dayStartTimestamp = dayID * 86400;
const tokenDayID = token.id const tokenDayID = token.id
@ -183,15 +165,10 @@ export const updateTokenDayData = async (db: Database, token: Token, event: { bl
return db.saveTokenDayData(tokenDayData, blockNumber); return db.saveTokenDayData(tokenDayData, blockNumber);
}; };
export const updateTokenHourData = async (db: Database, token: Token, event: { blockNumber: number }): Promise<TokenHourData> => { export const updateTokenHourData = async (db: Database, token: Token, event: { blockNumber: number, blockTimestamp: number }): Promise<TokenHourData> => {
const { blockNumber } = event; const { blockNumber, blockTimestamp } = event;
const bundle = await db.loadBundle({ id: '1', blockNumber }); const bundle = await db.loadBundle({ id: '1', blockNumber });
const hourIndex = Math.floor(blockTimestamp / 3600); // Get unique hour within unix history.
// TODO: Get block timestamp from event.
// let timestamp = event.block.timestamp.toI32()
const timestamp = Math.floor(Date.now() / 1000); // Unix timestamp.
const hourIndex = Math.floor(timestamp / 3600); // Get unique hour within unix history.
const hourStartUnix = hourIndex * 3600; // Want the rounded effect. const hourStartUnix = hourIndex * 3600; // Want the rounded effect.
const tokenHourID = token.id const tokenHourID = token.id

View File

@ -16,48 +16,51 @@ export class Client {
gql` gql`
subscription SubscriptionReceipt { subscription SubscriptionReceipt {
onEvent { onEvent {
blockHash block {
blockNumber number
hash
timestamp
}
contract contract
txHash tx {
hash
}
proof {
data
}
event { event {
proof { __typename
data
... on PoolCreatedEvent {
token0
token1
fee
tickSpacing
pool
} }
event {
__typename
... on PoolCreatedEvent {
token0
token1
fee
tickSpacing
pool
}
... on InitializeEvent { ... on InitializeEvent {
sqrtPriceX96 sqrtPriceX96
tick tick
} }
... on MintEvent { ... on MintEvent {
sender sender
owner owner
tickLower tickLower
tickUpper tickUpper
amount amount
amount0 amount0
amount1 amount1
} }
... on BurnEvent { ... on BurnEvent {
owner owner
tickLower tickLower
tickUpper tickUpper
amount amount
amount0 amount0
amount1 amount1
}
} }
} }
} }

View File

@ -7,21 +7,37 @@ export class Event {
@PrimaryGeneratedColumn() @PrimaryGeneratedColumn()
id!: number; id!: number;
// TODO: Denormalizing the block fields is simpler but perhaps not necessary.
@Column('varchar', { length: 66 }) @Column('varchar', { length: 66 })
blockHash!: string; blockHash!: string;
@Column('integer')
blockNumber!: number;
@Column('integer')
blockTimestamp!: number;
@Column('varchar', { length: 66 }) @Column('varchar', { length: 66 })
txHash!: string; txHash!: string;
// Index of the log in the block.
@Column('integer')
index!: number;
@Column('varchar', { length: 42 }) @Column('varchar', { length: 42 })
contract!: string; contract!: string;
@Column('varchar', { length: 256 }) @Column('varchar', { length: 256 })
eventName!: string; eventName!: string;
// TODO: Polymorphic relationships?
@Column('text') @Column('text')
eventData!: string; eventInfo!: string;
@Column('text')
extraInfo!: string;
@Column('boolean', { default: false })
isProcessed!: boolean;
@Column('text') @Column('text')
proof!: string; proof!: string;

View File

@ -30,14 +30,14 @@ export class EventWatcher {
const receipt = _.get(value, 'data.listen.relatedNode'); const receipt = _.get(value, 'data.listen.relatedNode');
log('watchLogs', JSON.stringify(receipt, null, 2)); log('watchLogs', JSON.stringify(receipt, null, 2));
const blocks = []; const blocks: string[] = [];
const { logContracts } = receipt; const { logContracts } = receipt;
if (logContracts && logContracts.length) { if (logContracts && logContracts.length) {
for (let logIndex = 0; logIndex < logContracts.length; logIndex++) { for (let logIndex = 0; logIndex < logContracts.length; logIndex++) {
const { ethTransactionCidByTxId: { ethHeaderCidByHeaderId: { blockHash, blockNumber } } } = receipt; const { ethTransactionCidByTxId: { ethHeaderCidByHeaderId: { blockHash } } } = receipt;
await this._indexer.getBlockEvents(blockHash); await this._indexer.getBlockEvents(blockHash);
blocks.push({ blockHash, blockNumber }); blocks.push(blockHash);
} }
} }
@ -48,7 +48,7 @@ export class EventWatcher {
// Process events, if from known uniswap contracts. // Process events, if from known uniswap contracts.
for (let bi = 0; bi < blocks.length; bi++) { for (let bi = 0; bi < blocks.length; bi++) {
const { blockHash, blockNumber } = blocks[bi]; const blockHash = blocks[bi];
if (processedBlocks[blockHash]) { if (processedBlocks[blockHash]) {
continue; continue;
} }
@ -56,13 +56,13 @@ export class EventWatcher {
const events = await this._indexer.getBlockEvents(blockHash); const events = await this._indexer.getBlockEvents(blockHash);
for (let ei = 0; ei < events.length; ei++) { for (let ei = 0; ei < events.length; ei++) {
const eventObj = events[ei]; const eventObj = events[ei];
const uniContract = await this._indexer.isUniswapContract(eventObj.extra.contract); const uniContract = await this._indexer.isUniswapContract(eventObj.contract);
if (uniContract) { if (uniContract) {
log('event', JSON.stringify(eventObj, null, 2)); log('event', JSON.stringify(eventObj, null, 2));
// TODO: Move processing to background queue (need sequential processing of events). // TODO: Move processing to background queue (need sequential processing of events).
// Trigger other indexer methods based on event topic. // Trigger other indexer methods based on event topic.
await this._indexer.processEvent(blockHash, blockNumber, uniContract, eventObj.extra.txHash, eventObj); await this._indexer.processEvent(eventObj);
} }
} }

View File

@ -19,13 +19,17 @@ import poolABI from './artifacts/pool.json';
const log = debug('vulcanize:indexer'); const log = debug('vulcanize:indexer');
type EventResult = { type ResultEvent = {
event: any; block: any;
proof: string; tx: any;
extra: any;
};
type EventsResult = Array<EventResult>; contract: string;
eventIndex: number;
event: any;
proof: string;
};
export class Indexer { export class Indexer {
_config: Config; _config: Config;
@ -57,11 +61,11 @@ export class Indexer {
return this._pubsub.asyncIterator(['event']); return this._pubsub.asyncIterator(['event']);
} }
async getBlockEvents (blockHash: string): Promise<EventsResult> { async getBlockEvents (blockHash: string): Promise<Array<Event>> {
const didSyncEvents = await this._db.didSyncEvents({ blockHash }); const didSyncEvents = await this._db.didSyncEvents({ blockHash });
if (!didSyncEvents) { if (!didSyncEvents) {
// Fetch and save events first and make a note in the event sync progress table. // Fetch and save events first and make a note in the event sync progress table.
await this._fetchAndSaveEvents({ blockHash }); await this.fetchAndSaveEvents({ blockHash });
log('getEvents: db miss, fetching from upstream server'); log('getEvents: db miss, fetching from upstream server');
} }
@ -70,30 +74,37 @@ export class Indexer {
const events = await this._db.getBlockEvents({ blockHash }); const events = await this._db.getBlockEvents({ blockHash });
log(`getEvents: db hit, num events: ${events.length}`); log(`getEvents: db hit, num events: ${events.length}`);
const result = events return events;
.map(e => {
const eventFields = JSON.parse(e.eventData);
return {
event: {
__typename: `${e.eventName}Event`,
...eventFields
},
// TODO: Return proof only if requested.
proof: JSON.parse(e.proof),
extra: {
contract: e.contract,
txHash: e.txHash
}
};
});
// log(JSONbig.stringify(result, null, 2));
return result;
} }
async getEvents (blockHash: string, contract: string, name: string | null): Promise<EventsResult> { getResultEvent (event: Event): ResultEvent {
const eventFields = JSON.parse(event.eventInfo);
return {
block: {
hash: event.blockHash,
number: event.blockNumber,
timestamp: event.blockTimestamp
},
tx: {
hash: event.txHash
},
contract: event.contract,
eventIndex: event.index,
event: {
__typename: `${event.eventName}Event`,
...eventFields
},
// TODO: Return proof only if requested.
proof: JSON.parse(event.proof),
};
}
async getEvents (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
const uniContract = await this.isUniswapContract(contract); const uniContract = await this.isUniswapContract(contract);
if (!uniContract) { if (!uniContract) {
throw new Error('Not a uniswap contract'); throw new Error('Not a uniswap contract');
@ -102,7 +113,7 @@ export class Indexer {
const didSyncEvents = await this._db.didSyncEvents({ blockHash }); const didSyncEvents = await this._db.didSyncEvents({ blockHash });
if (!didSyncEvents) { if (!didSyncEvents) {
// Fetch and save events first and make a note in the event sync progress table. // Fetch and save events first and make a note in the event sync progress table.
await this._fetchAndSaveEvents({ blockHash }); await this.fetchAndSaveEvents({ blockHash });
log('getEvents: db miss, fetching from upstream server'); log('getEvents: db miss, fetching from upstream server');
} }
@ -114,50 +125,30 @@ export class Indexer {
const result = events const result = events
.filter(event => contract === event.contract) .filter(event => contract === event.contract)
// TODO: Filter using db WHERE condition when name is not empty. // TODO: Filter using db WHERE condition when name is not empty.
.filter(event => !name || name === event.eventName) .filter(event => !name || name === event.eventName);
.map(e => {
const eventFields = JSON.parse(e.eventData);
return {
event: {
__typename: `${e.eventName}Event`,
...eventFields
},
// TODO: Return proof only if requested.
proof: JSON.parse(e.proof),
extra: {
contract: e.contract,
txHash: e.txHash
}
};
});
// log(JSONbig.stringify(result, null, 2));
return result; return result;
} }
async triggerIndexingOnEvent (blockNumber: number, event: EventResult): Promise<void> { async triggerIndexingOnEvent (dbEvent: Event): Promise<void> {
switch (event.event.__typename) { const re = this.getResultEvent(dbEvent);
switch (re.event.__typename) {
case 'PoolCreatedEvent': { case 'PoolCreatedEvent': {
const poolContract = ethers.utils.getAddress(event.event.pool); const poolContract = ethers.utils.getAddress(re.event.pool);
await this._db.saveContract(poolContract, KIND_POOL, blockNumber); await this._db.saveContract(poolContract, KIND_POOL, dbEvent.blockNumber);
} }
} }
} }
async publishEventToSubscribers (blockHash: string, blockNumber: number, contract: string, txHash: string, event: EventResult): Promise<void> { async publishEventToSubscribers (dbEvent: Event): Promise<void> {
log(`pushing event to GQL subscribers: ${event.event.__typename}`); const resultEvent = this.getResultEvent(dbEvent);
log(`pushing event to GQL subscribers: ${resultEvent.event.__typename}`);
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`. // Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
await this._pubsub.publish('event', { await this._pubsub.publish('event', {
onEvent: { onEvent: resultEvent
blockHash,
blockNumber,
contract,
txHash,
event
}
}); });
} }
@ -165,29 +156,47 @@ export class Indexer {
return this._db.getContract(ethers.utils.getAddress(address)); return this._db.getContract(ethers.utils.getAddress(address));
} }
async processEvent (blockHash: string, blockNumber: number, contract: Contract, txHash: string, event: EventResult): Promise<void> { async processEvent (event: Event): Promise<void> {
// Trigger indexing of data based on the event. // Trigger indexing of data based on the event.
await this.triggerIndexingOnEvent(blockNumber, event); await this.triggerIndexingOnEvent(event);
// Also trigger downstream event watcher subscriptions. // Also trigger downstream event watcher subscriptions.
await this.publishEventToSubscribers(blockHash, blockNumber, contract.address, txHash, event); await this.publishEventToSubscribers(event);
} }
async _fetchAndSaveEvents ({ blockHash }: { blockHash: string }): Promise<void> { async fetchAndSaveEvents ({ blockHash }: { blockHash: string }): Promise<void> {
const logs = await this._ethClient.getLogs({ blockHash }); const logs = await this._ethClient.getLogs({ blockHash });
const dbEvents: Array<DeepPartial<Event>> = []; const dbEvents: Array<DeepPartial<Event>> = [];
for (let logIndex = 0; logIndex < logs.length; logIndex++) { for (let li = 0; li < logs.length; li++) {
const logObj = logs[logIndex]; const logObj = logs[li];
const { topics, data, cid, ipldBlock, account: { address }, transaction: { hash: txHash } } = logObj; const {
topics,
data,
index: logIndex,
cid,
ipldBlock,
account: {
address
},
transaction: {
hash: txHash,
block: {
number: blockNumber,
timestamp: blockTimestamp
}
}
} = logObj;
let eventName; let eventName;
let eventProps = {}; let eventInfo = {};
let extraInfo = {};
const contract = ethers.utils.getAddress(address); const contract = ethers.utils.getAddress(address);
const uniContract = await this.isUniswapContract(contract); const uniContract = await this.isUniswapContract(contract);
if (!uniContract) { if (!uniContract) {
// TODO: Can only be known if events are processed serially.
continue; continue;
} }
@ -198,7 +207,7 @@ export class Indexer {
case 'PoolCreated': { case 'PoolCreated': {
eventName = logDescription.name; eventName = logDescription.name;
const { token0, token1, fee, tickSpacing, pool } = logDescription.args; const { token0, token1, fee, tickSpacing, pool } = logDescription.args;
eventProps = { token0, token1, fee, tickSpacing, pool }; eventInfo = { token0, token1, fee, tickSpacing, pool };
break; break;
} }
@ -212,14 +221,14 @@ export class Indexer {
case 'Initialize': { case 'Initialize': {
eventName = logDescription.name; eventName = logDescription.name;
const { sqrtPriceX96, tick } = logDescription.args; const { sqrtPriceX96, tick } = logDescription.args;
eventProps = { sqrtPriceX96: sqrtPriceX96.toString(), tick }; eventInfo = { sqrtPriceX96: sqrtPriceX96.toString(), tick };
break; break;
} }
case 'Mint': { case 'Mint': {
eventName = logDescription.name; eventName = logDescription.name;
const { sender, owner, tickLower, tickUpper, amount, amount0, amount1 } = logDescription.args; const { sender, owner, tickLower, tickUpper, amount, amount0, amount1 } = logDescription.args;
eventProps = { eventInfo = {
sender, sender,
owner, owner,
tickLower, tickLower,
@ -234,7 +243,7 @@ export class Indexer {
case 'Burn': { case 'Burn': {
eventName = logDescription.name; eventName = logDescription.name;
const { owner, tickLower, tickUpper, amount, amount0, amount1 } = logDescription.args; const { owner, tickLower, tickUpper, amount, amount0, amount1 } = logDescription.args;
eventProps = { eventInfo = {
owner, owner,
tickLower, tickLower,
tickUpper, tickUpper,
@ -248,7 +257,7 @@ export class Indexer {
case 'Swap': { case 'Swap': {
eventName = logDescription.name; eventName = logDescription.name;
const { sender, recipient, amount0, amount1, sqrtPriceX96, liquidity, tick } = logDescription.args; const { sender, recipient, amount0, amount1, sqrtPriceX96, liquidity, tick } = logDescription.args;
eventProps = { eventInfo = {
sender, sender,
recipient, recipient,
amount0: amount0.toString(), amount0: amount0.toString(),
@ -269,10 +278,14 @@ export class Indexer {
if (eventName) { if (eventName) {
dbEvents.push({ dbEvents.push({
blockHash, blockHash,
blockNumber,
blockTimestamp,
index: logIndex,
txHash, txHash,
contract, contract,
eventName, eventName,
eventData: JSONbig.stringify({ ...eventProps }), eventInfo: JSONbig.stringify(eventInfo),
extraInfo: JSONbig.stringify(extraInfo),
proof: JSONbig.stringify({ proof: JSONbig.stringify({
data: JSONbig.stringify({ data: JSONbig.stringify({
blockHash, blockHash,

View File

@ -54,7 +54,9 @@ export const createResolvers = async (indexer: Indexer): Promise<any> => {
events: async (_: any, { blockHash, contract, name }: { blockHash: string, contract: string, name: string }) => { events: async (_: any, { blockHash, contract, name }: { blockHash: string, contract: string, name: string }) => {
log('events', blockHash, contract, name || ''); log('events', blockHash, contract, name || '');
return indexer.getEvents(blockHash, contract, name); const events = await indexer.getEvents(blockHash, contract, name);
return events.map(event => indexer.getResultEvent(event));
} }
} }
}; };

View File

@ -12,14 +12,6 @@ type Proof {
data: String! data: String!
} }
# Result type, with proof, for string method return values.
type ResultString {
value: String
# Proof from state/storage trie.
proof: Proof
}
# Result type, with proof, for uint256 method return values. # Result type, with proof, for uint256 method return values.
type ResultUInt256 { type ResultUInt256 {
value: BigInt! value: BigInt!
@ -162,23 +154,39 @@ union PoolEvent = InitializeEvent | MintEvent | BurnEvent | SwapEvent
# All events emitted by the watcher. # All events emitted by the watcher.
union Event = TransferEvent | PoolCreatedEvent | IncreaseLiquidityEvent | DecreaseLiquidityEvent | CollectEvent | InitializeEvent | MintEvent | BurnEvent | SwapEvent union Event = TransferEvent | PoolCreatedEvent | IncreaseLiquidityEvent | DecreaseLiquidityEvent | CollectEvent | InitializeEvent | MintEvent | BurnEvent | SwapEvent
# Result type, with proof, for event return values. # Ethereum types
type Block {
hash: String!
number: Int!
timestamp: Int!
}
type Transaction {
hash: String!
index: Int!
from: String!
to: String!
}
# Result event, include additional context over and above the event data.
type ResultEvent { type ResultEvent {
# Block and tx data for the event.
block: Block!
tx: Transaction!
# Contract that generated the event.
contract: String!
# Index of the event in the block.
eventIndex: Int!
event: Event! event: Event!
# Proof from receipts trie. # Proof from receipts trie.
proof: Proof proof: Proof
} }
# Watched event, include additional context over and above the event data.
type WatchedEvent {
blockHash: String!
blockNumber: Int!
contract: String!
txHash: String!
event: ResultEvent!
}
# #
# Queries # Queries
@ -224,6 +232,12 @@ type Query {
contract: String! contract: String!
name: String name: String
): [ResultEvent!] ): [ResultEvent!]
# Get uniswap events in a given block range.
eventsInRange(
fromBlockNumber: Int!
toBlockNumber: Int!
): [ResultEvent!]
} }
# #
@ -232,6 +246,6 @@ type Query {
type Subscription { type Subscription {
# Watch for events (at head of chain). # Watch for events (at head of chain).
onEvent: WatchedEvent! onEvent: ResultEvent!
} }
`; `;