Use prefetching of blocks with events in watchers and codegen (#206)

* Avoid refetching block while fetching events

* Prefetch a batch of blocks with events while indexing

* Update mock indexer used in graph-node testing

* Process available blocks while prefetching

* Refactor events fetching to a method in util

* Move method to get GQL event query result to util
This commit is contained in:
prathamesh0 2022-10-20 08:16:56 -05:00 committed by GitHub
parent 668875b3a0
commit 306bbb73ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 436 additions and 1105 deletions

View File

@ -50,7 +50,6 @@
[upstream.ethServer] [upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql" gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081" rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"
@ -62,3 +61,6 @@
maxCompletionLagInSecs = 300 maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100 jobDelayInMilliSecs = 100
eventsInBatch = 50 eventsInBatch = 50
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10

View File

@ -109,7 +109,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -4,9 +4,9 @@
import assert from 'assert'; import assert from 'assert';
// import { updateStateForMappingType, updateStateForElementaryType } from '@cerc-io/util'; import { updateStateForMappingType, updateStateForElementaryType, ResultEvent } from '@cerc-io/util';
import { Indexer, ResultEvent } from './indexer'; import { Indexer } from './indexer';
/** /**
* Hook function to store an initial state. * Hook function to store an initial state.

View File

@ -89,7 +89,7 @@ export const main = async (): Promise<any> => {
jobQueue, jobQueue,
indexer, indexer,
eventWatcher, eventWatcher,
config.upstream.ethServer.blockDelayInMilliSecs, jobQueueConfig.blockDelayInMilliSecs,
{ {
prefetch: true, prefetch: true,
startBlock: importData.snapshotBlock.blockNumber, startBlock: importData.snapshotBlock.blockNumber,

View File

@ -20,7 +20,6 @@ import {
Indexer as BaseIndexer, Indexer as BaseIndexer,
IndexerInterface, IndexerInterface,
ValueResult, ValueResult,
UNKNOWN_EVENT_NAME,
ServerConfig, ServerConfig,
JobQueue, JobQueue,
Where, Where,
@ -31,7 +30,9 @@ import {
BlockHeight, BlockHeight,
{{/if}} {{/if}}
StateKind, StateKind,
StateStatus StateStatus,
ResultEvent,
getResultEvent
} from '@cerc-io/util'; } from '@cerc-io/util';
{{#if (subgraphPath)}} {{#if (subgraphPath)}}
import { GraphWatcher } from '@cerc-io/graph-node'; import { GraphWatcher } from '@cerc-io/graph-node';
@ -64,30 +65,6 @@ const KIND_{{capitalize contract.contractName}} = '{{contract.contractKind}}';
const {{capitalize event}}_EVENT = '{{event}}'; const {{capitalize event}}_EVENT = '{{event}}';
{{/each}} {{/each}}
export type ResultEvent = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
tx: {
hash: string;
from: string;
to: string;
index: number;
};
contract: string;
eventIndex: number;
eventSignature: string;
event: any;
proof: string;
};
export class Indexer implements IndexerInterface { export class Indexer implements IndexerInterface {
_db: Database _db: Database
_ethClient: EthClient _ethClient: EthClient
@ -170,38 +147,7 @@ export class Indexer implements IndexerInterface {
} }
getResultEvent (event: Event): ResultEvent { getResultEvent (event: Event): ResultEvent {
const block = event.block; return getResultEvent(event);
const eventFields = JSONbigNative.parse(event.eventInfo);
const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo);
return {
block: {
cid: block.cid,
hash: block.blockHash,
number: block.blockNumber,
timestamp: block.blockTimestamp,
parentHash: block.parentHash
},
tx: {
hash: event.txHash,
from: tx.src,
to: tx.dst,
index: tx.index
},
contract: event.contract,
eventIndex: event.index,
eventSignature,
event: {
__typename: `${event.eventName}Event`,
...eventFields
},
// TODO: Return proof only if requested.
proof: JSON.parse(event.proof)
};
} }
{{#each queries as | query |}} {{#each queries as | query |}}
@ -458,12 +404,12 @@ export class Indexer implements IndexerInterface {
const logDescription = contract.parseLog({ data, topics }); const logDescription = contract.parseLog({ data, topics });
const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription); const { eventName, eventInfo, eventSignature } = this._baseIndexer.parseEvent(logDescription);
return { return {
eventName, eventName,
eventInfo, eventInfo,
eventSignature: logDescription.signature eventSignature
}; };
} }
@ -687,126 +633,33 @@ export class Indexer implements IndexerInterface {
} }
{{/if}} {{/if}}
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> { async _saveBlockAndFetchEvents ({
cid: blockCid,
blockHash,
blockNumber,
blockTimestamp,
parentHash
}: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
assert(blockHash); assert(blockHash);
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
const blockPromise = this._ethClient.getBlockByHash(blockHash);
let logs: any[];
console.time('time:indexer#_fetchAndSaveEvents-fetch-logs'); const dbEvents = await this._baseIndexer.fetchEvents(blockHash, this.parseEventNameAndArgs.bind(this));
if (this._serverConfig.filterLogs) {
const watchedContracts = this._baseIndexer.getWatchedContracts();
const addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
const logsResult = await this._ethClient.getLogs({
blockHash,
addresses
});
logs = logsResult.logs;
} else {
({ logs } = await this._ethClient.getLogs({ blockHash }));
}
console.timeEnd('time:indexer#_fetchAndSaveEvents-fetch-logs');
let [
{ block },
{
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
]
}
}
] = await Promise.all([blockPromise, transactionsPromise]);
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;
return acc;
}, {});
const dbEvents: Array<DeepPartial<Event>> = [];
for (let li = 0; li < logs.length; li++) {
const logObj = logs[li];
const {
topics,
data,
index: logIndex,
cid,
ipldBlock,
account: {
address
},
transaction: {
hash: txHash
},
receiptCID,
status
} = logObj;
if (status) {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
const tx = transactionMap[txHash];
const extraInfo: { [key: string]: any } = { topics, data, tx };
const contract = ethers.utils.getAddress(address);
const watchedContract = await this.isWatchedContract(contract);
if (watchedContract) {
const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj);
eventName = eventDetails.eventName;
eventInfo = eventDetails.eventInfo;
extraInfo.eventSignature = eventDetails.eventSignature;
}
dbEvents.push({
index: logIndex,
txHash,
contract,
eventName,
eventInfo: JSONbigNative.stringify(eventInfo),
extraInfo: JSONbigNative.stringify(extraInfo),
proof: JSONbigNative.stringify({
data: JSONbigNative.stringify({
blockHash,
receiptCID,
log: {
cid,
ipldBlock
}
})
})
});
} else {
log(`Skipping event for receipt ${receiptCID} due to failed transaction.`);
}
}
const dbTx = await this._db.createTransactionRunner(); const dbTx = await this._db.createTransactionRunner();
try { try {
block = { const block = {
cid: blockCid, cid: blockCid,
blockHash, blockHash,
blockNumber: block.number, blockNumber,
blockTimestamp: block.timestamp, blockTimestamp,
parentHash: block.parent.hash parentHash
}; };
console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); console.time(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`);
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction(); await dbTx.commitTransaction();
console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); console.timeEnd(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`);
return blockProgress; return [blockProgress, []];
} catch (error) { } catch (error) {
await dbTx.rollbackTransaction(); await dbTx.rollbackTransaction();
throw error; throw error;

View File

@ -48,7 +48,6 @@
[upstream.ethServer] [upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8083/graphql" gqlApiEndpoint = "http://127.0.0.1:8083/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8082" rpcProviderEndpoint = "http://127.0.0.1:8082"
blockDelayInMilliSecs = 2000
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"
@ -60,3 +59,6 @@
maxCompletionLagInSecs = 300 maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100 jobDelayInMilliSecs = 100
eventsInBatch = 50 eventsInBatch = 50
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10

View File

@ -83,7 +83,7 @@ export const main = async (): Promise<any> => {
jobQueue, jobQueue,
indexer, indexer,
eventWatcher, eventWatcher,
config.upstream.ethServer.blockDelayInMilliSecs, jobQueueConfig.blockDelayInMilliSecs,
{ {
prefetch: true, prefetch: true,
startBlock: importData.snapshotBlock.blockNumber, startBlock: importData.snapshotBlock.blockNumber,

View File

@ -97,7 +97,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -4,7 +4,9 @@
import assert from 'assert'; import assert from 'assert';
import { Indexer, ResultEvent } from './indexer'; import { ResultEvent } from '@cerc-io/util';
import { Indexer } from './indexer';
/** /**
* Hook function to store an initial state. * Hook function to store an initial state.

View File

@ -3,9 +3,7 @@
// //
import assert from 'assert'; import assert from 'assert';
import debug from 'debug';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers'; import { ethers } from 'ethers';
import _ from 'lodash'; import _ from 'lodash';
import { SelectionNode } from 'graphql'; import { SelectionNode } from 'graphql';
@ -16,7 +14,6 @@ import { EthClient } from '@cerc-io/ipld-eth-client';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import { import {
Indexer as BaseIndexer, Indexer as BaseIndexer,
UNKNOWN_EVENT_NAME,
ServerConfig, ServerConfig,
JobQueue, JobQueue,
Where, Where,
@ -25,7 +22,9 @@ import {
StateKind, StateKind,
IndexerInterface, IndexerInterface,
StateStatus, StateStatus,
ValueResult ValueResult,
ResultEvent,
getResultEvent
} from '@cerc-io/util'; } from '@cerc-io/util';
import { GraphWatcher } from '@cerc-io/graph-node'; import { GraphWatcher } from '@cerc-io/graph-node';
@ -57,37 +56,10 @@ import { Claim } from './entity/Claim';
import { Account } from './entity/Account'; import { Account } from './entity/Account';
import { Slash } from './entity/Slash'; import { Slash } from './entity/Slash';
const log = debug('vulcanize:indexer');
const JSONbigNative = JSONbig({ useNativeBigInt: true });
const KIND_EDENNETWORK = 'EdenNetwork'; const KIND_EDENNETWORK = 'EdenNetwork';
const KIND_MERKLEDISTRIBUTOR = 'EdenNetworkDistribution'; const KIND_MERKLEDISTRIBUTOR = 'EdenNetworkDistribution';
const KIND_DISTRIBUTORGOVERNANCE = 'EdenNetworkGovernance'; const KIND_DISTRIBUTORGOVERNANCE = 'EdenNetworkGovernance';
export type ResultEvent = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
tx: {
hash: string;
from: string;
to: string;
index: number;
};
contract: string;
eventIndex: number;
eventSignature: string;
event: any;
proof: string;
};
export class Indexer implements IndexerInterface { export class Indexer implements IndexerInterface {
_db: Database _db: Database
_ethClient: EthClient _ethClient: EthClient
@ -168,38 +140,7 @@ export class Indexer implements IndexerInterface {
} }
getResultEvent (event: Event): ResultEvent { getResultEvent (event: Event): ResultEvent {
const block = event.block; return getResultEvent(event);
const eventFields = JSONbigNative.parse(event.eventInfo);
const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo);
return {
block: {
cid: block.cid,
hash: block.blockHash,
number: block.blockNumber,
timestamp: block.blockTimestamp,
parentHash: block.parentHash
},
tx: {
hash: event.txHash,
from: tx.src,
to: tx.dst,
index: tx.index
},
contract: event.contract,
eventIndex: event.index,
eventSignature,
event: {
__typename: `${event.eventName}Event`,
...eventFields
},
// TODO: Return proof only if requested.
proof: JSON.parse(event.proof)
};
} }
async getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: MappingKey[]): Promise<ValueResult> { async getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: MappingKey[]): Promise<ValueResult> {
@ -393,12 +334,12 @@ export class Indexer implements IndexerInterface {
const logDescription = contract.parseLog({ data, topics }); const logDescription = contract.parseLog({ data, topics });
const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription); const { eventName, eventInfo, eventSignature } = this._baseIndexer.parseEvent(logDescription);
return { return {
eventName, eventName,
eventInfo, eventInfo,
eventSignature: logDescription.signature eventSignature
}; };
} }
@ -522,8 +463,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned); return this._baseIndexer.getBlocksAtHeight(height, isPruned);
} }
async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> { async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); return this._baseIndexer.saveBlockAndFetchEvents(block, this._saveBlockAndFetchEvents.bind(this));
} }
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> { async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
@ -939,126 +880,33 @@ export class Indexer implements IndexerInterface {
}); });
} }
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> { async _saveBlockAndFetchEvents ({
cid: blockCid,
blockHash,
blockNumber,
blockTimestamp,
parentHash
}: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
assert(blockHash); assert(blockHash);
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
const blockPromise = this._ethClient.getBlockByHash(blockHash);
let logs: any[];
console.time('time:indexer#_fetchAndSaveEvents-fetch-logs'); const dbEvents = await this._baseIndexer.fetchEvents(blockHash, this.parseEventNameAndArgs.bind(this));
if (this._serverConfig.filterLogs) {
const watchedContracts = this._baseIndexer.getWatchedContracts();
const addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
const logsResult = await this._ethClient.getLogs({
blockHash,
addresses
});
logs = logsResult.logs;
} else {
({ logs } = await this._ethClient.getLogs({ blockHash }));
}
console.timeEnd('time:indexer#_fetchAndSaveEvents-fetch-logs');
let [
{ block },
{
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
]
}
}
] = await Promise.all([blockPromise, transactionsPromise]);
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;
return acc;
}, {});
const dbEvents: Array<DeepPartial<Event>> = [];
for (let li = 0; li < logs.length; li++) {
const logObj = logs[li];
const {
topics,
data,
index: logIndex,
cid,
ipldBlock,
account: {
address
},
transaction: {
hash: txHash
},
receiptCID,
status
} = logObj;
if (status) {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
const tx = transactionMap[txHash];
const extraInfo: { [key: string]: any } = { topics, data, tx };
const contract = ethers.utils.getAddress(address);
const watchedContract = await this.isWatchedContract(contract);
if (watchedContract) {
const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj);
eventName = eventDetails.eventName;
eventInfo = eventDetails.eventInfo;
extraInfo.eventSignature = eventDetails.eventSignature;
}
dbEvents.push({
index: logIndex,
txHash,
contract,
eventName,
eventInfo: JSONbigNative.stringify(eventInfo),
extraInfo: JSONbigNative.stringify(extraInfo),
proof: JSONbigNative.stringify({
data: JSONbigNative.stringify({
blockHash,
receiptCID,
log: {
cid,
ipldBlock
}
})
})
});
} else {
log(`Skipping event for receipt ${receiptCID} due to failed transaction.`);
}
}
const dbTx = await this._db.createTransactionRunner(); const dbTx = await this._db.createTransactionRunner();
try { try {
block = { const block = {
cid: blockCid, cid: blockCid,
blockHash, blockHash,
blockNumber: block.number, blockNumber,
blockTimestamp: block.timestamp, blockTimestamp,
parentHash: block.parent.hash parentHash
}; };
console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); console.time(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`);
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction(); await dbTx.commitTransaction();
console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); console.timeEnd(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`);
return blockProgress; return [blockProgress, []];
} catch (error) { } catch (error) {
await dbTx.rollbackTransaction(); await dbTx.rollbackTransaction();
throw error; throw error;

View File

@ -22,7 +22,6 @@
[upstream.ethServer] [upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql" gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081" rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"
@ -34,3 +33,6 @@
maxCompletionLagInSecs = 300 maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100 jobDelayInMilliSecs = 100
eventsInBatch = 50 eventsInBatch = 50
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10

View File

@ -78,7 +78,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -12,7 +12,7 @@ import { BaseProvider } from '@ethersproject/providers';
import { EthClient } from '@cerc-io/ipld-eth-client'; import { EthClient } from '@cerc-io/ipld-eth-client';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import { IndexerInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions, ServerConfig, StateStatus } from '@cerc-io/util'; import { IndexerInterface, Indexer as BaseIndexer, ValueResult, JobQueue, Where, QueryOptions, ServerConfig, StateStatus } from '@cerc-io/util';
import { Database } from './database'; import { Database } from './database';
import { Event } from './entity/Event'; import { Event } from './entity/Event';
@ -398,8 +398,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned); return this._baseIndexer.getBlocksAtHeight(height, isPruned);
} }
async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> { async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); return this._baseIndexer.saveBlockAndFetchEvents(block, this._saveBlockAndFetchEvents.bind(this));
} }
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> { async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
@ -422,79 +422,25 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth); return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
} }
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> { async _saveBlockAndFetchEvents ({
cid: blockCid,
blockHash,
blockNumber,
blockTimestamp,
parentHash
}: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
assert(blockHash); assert(blockHash);
let [{ block }, { logs }] = await Promise.all([
this._ethClient.getBlockByHash(blockHash),
this._ethClient.getLogs({ blockHash })
]);
const dbEvents: Array<DeepPartial<Event>> = []; const dbEvents = await this._baseIndexer.fetchEvents(blockHash, this.parseEventNameAndArgs.bind(this));
for (let li = 0; li < logs.length; li++) {
const logObj = logs[li];
const {
topics,
data,
index: logIndex,
cid,
ipldBlock,
account: {
address
},
transaction: {
hash: txHash
},
receiptCID,
status
} = logObj;
if (status) {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
const extraInfo = { topics, data };
const contract = ethers.utils.getAddress(address);
const watchedContract = await this.isWatchedContract(contract);
if (watchedContract) {
const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj);
eventName = eventDetails.eventName;
eventInfo = eventDetails.eventInfo;
}
dbEvents.push({
index: logIndex,
txHash,
contract,
eventName,
eventInfo: JSONbigNative.stringify(eventInfo),
extraInfo: JSONbigNative.stringify(extraInfo),
proof: JSONbigNative.stringify({
data: JSONbigNative.stringify({
blockHash,
receiptCID,
log: {
cid,
ipldBlock
}
})
})
});
} else {
log(`Skipping event for receipt ${receiptCID} due to failed transaction.`);
}
}
const dbTx = await this._db.createTransactionRunner(); const dbTx = await this._db.createTransactionRunner();
try { try {
block = { const block = {
cid: blockCid, cid: blockCid,
blockHash, blockHash,
blockNumber: block.number, blockNumber,
blockTimestamp: block.timestamp, blockTimestamp,
parentHash: block.parent.hash parentHash
}; };
console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); console.time('time:indexer#_fetchAndSaveEvents-save-block-events');
@ -502,7 +448,7 @@ export class Indexer implements IndexerInterface {
await dbTx.commitTransaction(); await dbTx.commitTransaction();
console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events');
return blockProgress; return [blockProgress, []];
} catch (error) { } catch (error) {
await dbTx.rollbackTransaction(); await dbTx.rollbackTransaction();
throw error; throw error;

View File

@ -34,7 +34,6 @@
[upstream.ethServer] [upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql" gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081" rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"
@ -46,3 +45,6 @@
maxCompletionLagInSecs = 300 maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100 jobDelayInMilliSecs = 100
eventsInBatch = 50 eventsInBatch = 50
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10

View File

@ -74,7 +74,7 @@ export const main = async (): Promise<any> => {
jobQueue, jobQueue,
indexer, indexer,
eventWatcher, eventWatcher,
config.upstream.ethServer.blockDelayInMilliSecs, jobQueueConfig.blockDelayInMilliSecs,
{ {
prefetch: true, prefetch: true,
startBlock: importData.snapshotBlock.blockNumber, startBlock: importData.snapshotBlock.blockNumber,

View File

@ -76,7 +76,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -4,9 +4,9 @@
import assert from 'assert'; import assert from 'assert';
import { updateStateForElementaryType } from '@cerc-io/util'; import { updateStateForElementaryType, ResultEvent } from '@cerc-io/util';
import { Indexer, ResultEvent } from './indexer'; import { Indexer } from './indexer';
import { TransferCount } from './entity/TransferCount'; import { TransferCount } from './entity/TransferCount';
/** /**

View File

@ -16,7 +16,6 @@ import {
Indexer as BaseIndexer, Indexer as BaseIndexer,
IndexerInterface, IndexerInterface,
ValueResult, ValueResult,
UNKNOWN_EVENT_NAME,
ServerConfig, ServerConfig,
JobQueue, JobQueue,
Where, Where,
@ -25,7 +24,9 @@ import {
updateStateForMappingType, updateStateForMappingType,
BlockHeight, BlockHeight,
StateKind, StateKind,
StateStatus StateStatus,
ResultEvent,
getResultEvent
} from '@cerc-io/util'; } from '@cerc-io/util';
import ERC721Artifacts from './artifacts/ERC721.json'; import ERC721Artifacts from './artifacts/ERC721.json';
@ -44,30 +45,6 @@ const JSONbigNative = JSONbig({ useNativeBigInt: true });
const KIND_ERC721 = 'ERC721'; const KIND_ERC721 = 'ERC721';
export type ResultEvent = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
tx: {
hash: string;
from: string;
to: string;
index: number;
};
contract: string;
eventIndex: number;
eventSignature: string;
event: any;
proof: string;
};
export class Indexer implements IndexerInterface { export class Indexer implements IndexerInterface {
_db: Database _db: Database
_ethClient: EthClient _ethClient: EthClient
@ -119,38 +96,7 @@ export class Indexer implements IndexerInterface {
} }
getResultEvent (event: Event): ResultEvent { getResultEvent (event: Event): ResultEvent {
const block = event.block; return getResultEvent(event);
const eventFields = JSONbigNative.parse(event.eventInfo);
const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo);
return {
block: {
cid: block.cid,
hash: block.blockHash,
number: block.blockNumber,
timestamp: block.blockTimestamp,
parentHash: block.parentHash
},
tx: {
hash: event.txHash,
from: tx.src,
to: tx.dst,
index: tx.index
},
contract: event.contract,
eventIndex: event.index,
eventSignature,
event: {
__typename: `${event.eventName}Event`,
...eventFields
},
// TODO: Return proof only if requested.
proof: JSON.parse(event.proof)
};
} }
async supportsInterface (blockHash: string, contractAddress: string, interfaceId: string): Promise<ValueResult> { async supportsInterface (blockHash: string, contractAddress: string, interfaceId: string): Promise<ValueResult> {
@ -760,12 +706,12 @@ export class Indexer implements IndexerInterface {
const logDescription = contract.parseLog({ data, topics }); const logDescription = contract.parseLog({ data, topics });
const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription); const { eventName, eventInfo, eventSignature } = this._baseIndexer.parseEvent(logDescription);
return { return {
eventName, eventName,
eventInfo, eventInfo,
eventSignature: logDescription.signature eventSignature
}; };
} }
@ -893,8 +839,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned); return this._baseIndexer.getBlocksAtHeight(height, isPruned);
} }
async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> { async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); return this._baseIndexer.saveBlockAndFetchEvents(block, this._saveBlockAndFetchEvents.bind(this));
} }
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> { async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
@ -917,126 +863,33 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getAncestorAtDepth(blockHash, depth); return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
} }
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> { async _saveBlockAndFetchEvents ({
cid: blockCid,
blockHash,
blockNumber,
blockTimestamp,
parentHash
}: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
assert(blockHash); assert(blockHash);
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
const blockPromise = this._ethClient.getBlockByHash(blockHash);
let logs: any[];
console.time('time:indexer#_fetchAndSaveEvents-fetch-logs'); const dbEvents = await this._baseIndexer.fetchEvents(blockHash, this.parseEventNameAndArgs.bind(this));
if (this._serverConfig.filterLogs) {
const watchedContracts = this._baseIndexer.getWatchedContracts();
const addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
const logsResult = await this._ethClient.getLogs({
blockHash,
addresses
});
logs = logsResult.logs;
} else {
({ logs } = await this._ethClient.getLogs({ blockHash }));
}
console.timeEnd('time:indexer#_fetchAndSaveEvents-fetch-logs');
let [
{ block },
{
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
]
}
}
] = await Promise.all([blockPromise, transactionsPromise]);
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;
return acc;
}, {});
const dbEvents: Array<DeepPartial<Event>> = [];
for (let li = 0; li < logs.length; li++) {
const logObj = logs[li];
const {
topics,
data,
index: logIndex,
cid,
ipldBlock,
account: {
address
},
transaction: {
hash: txHash
},
receiptCID,
status
} = logObj;
if (status) {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
const tx = transactionMap[txHash];
const extraInfo: { [key: string]: any } = { topics, data, tx };
const contract = ethers.utils.getAddress(address);
const watchedContract = await this.isWatchedContract(contract);
if (watchedContract) {
const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj);
eventName = eventDetails.eventName;
eventInfo = eventDetails.eventInfo;
extraInfo.eventSignature = eventDetails.eventSignature;
}
dbEvents.push({
index: logIndex,
txHash,
contract,
eventName,
eventInfo: JSONbigNative.stringify(eventInfo),
extraInfo: JSONbigNative.stringify(extraInfo),
proof: JSONbigNative.stringify({
data: JSONbigNative.stringify({
blockHash,
receiptCID,
log: {
cid,
ipldBlock
}
})
})
});
} else {
log(`Skipping event for receipt ${receiptCID} due to failed transaction.`);
}
}
const dbTx = await this._db.createTransactionRunner(); const dbTx = await this._db.createTransactionRunner();
try { try {
block = { const block = {
cid: blockCid, cid: blockCid,
blockHash, blockHash,
blockNumber: block.number, blockNumber,
blockTimestamp: block.timestamp, blockTimestamp,
parentHash: block.parent.hash parentHash
}; };
console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); console.time(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`);
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction(); await dbTx.commitTransaction();
console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); console.timeEnd(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`);
return blockProgress; return [blockProgress, []];
} catch (error) { } catch (error) {
await dbTx.rollbackTransaction(); await dbTx.rollbackTransaction();
throw error; throw error;

View File

@ -1,5 +1,5 @@
import assert from 'assert'; import assert from 'assert';
import { FindConditions, FindManyOptions } from 'typeorm'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import { import {
IndexerInterface, IndexerInterface,
@ -89,8 +89,8 @@ export class Indexer implements IndexerInterface {
return ''; return '';
} }
async fetchBlockWithEvents (block: BlockProgressInterface): Promise<BlockProgressInterface> { async saveBlockAndFetchEvents (block: BlockProgressInterface): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]> {
return block; return [block, []];
} }
async removeUnknownEvents (block: BlockProgressInterface): Promise<void> { async removeUnknownEvents (block: BlockProgressInterface): Promise<void> {

View File

@ -40,7 +40,6 @@
[upstream.ethServer] [upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql" gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081" rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 2000
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"
@ -52,3 +51,6 @@
maxCompletionLagInSecs = 300 maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100 jobDelayInMilliSecs = 100
eventsInBatch = 50 eventsInBatch = 50
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10

View File

@ -83,7 +83,7 @@ export const main = async (): Promise<any> => {
jobQueue, jobQueue,
indexer, indexer,
eventWatcher, eventWatcher,
config.upstream.ethServer.blockDelayInMilliSecs, jobQueueConfig.blockDelayInMilliSecs,
{ {
prefetch: true, prefetch: true,
startBlock: importData.snapshotBlock.blockNumber, startBlock: importData.snapshotBlock.blockNumber,

View File

@ -86,7 +86,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -4,7 +4,9 @@
import assert from 'assert'; import assert from 'assert';
import { Indexer, ResultEvent } from './indexer'; import { ResultEvent } from '@cerc-io/util';
import { Indexer } from './indexer';
/** /**
* Hook function to store an initial state. * Hook function to store an initial state.

View File

@ -17,7 +17,6 @@ import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper';
import { import {
Indexer as BaseIndexer, Indexer as BaseIndexer,
ValueResult, ValueResult,
UNKNOWN_EVENT_NAME,
ServerConfig, ServerConfig,
updateStateForElementaryType, updateStateForElementaryType,
JobQueue, JobQueue,
@ -26,7 +25,9 @@ import {
BlockHeight, BlockHeight,
StateKind, StateKind,
IndexerInterface, IndexerInterface,
StateStatus StateStatus,
ResultEvent,
getResultEvent
} from '@cerc-io/util'; } from '@cerc-io/util';
import { GraphWatcher } from '@cerc-io/graph-node'; import { GraphWatcher } from '@cerc-io/graph-node';
@ -48,30 +49,6 @@ const JSONbigNative = JSONbig({ useNativeBigInt: true });
const KIND_EXAMPLE1 = 'Example1'; const KIND_EXAMPLE1 = 'Example1';
export type ResultEvent = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
tx: {
hash: string;
from: string;
to: string;
index: number;
};
contract: string;
eventIndex: number;
eventSignature: string;
event: any;
proof: string;
};
export class Indexer implements IndexerInterface { export class Indexer implements IndexerInterface {
_db: Database _db: Database
_ethClient: EthClient _ethClient: EthClient
@ -138,38 +115,7 @@ export class Indexer implements IndexerInterface {
} }
getResultEvent (event: Event): ResultEvent { getResultEvent (event: Event): ResultEvent {
const block = event.block; return getResultEvent(event);
const eventFields = JSONbigNative.parse(event.eventInfo);
const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo);
return {
block: {
cid: block.cid,
hash: block.blockHash,
number: block.blockNumber,
timestamp: block.blockTimestamp,
parentHash: block.parentHash
},
tx: {
hash: event.txHash,
from: tx.src,
to: tx.dst,
index: tx.index
},
contract: event.contract,
eventIndex: event.index,
eventSignature,
event: {
__typename: `${event.eventName}Event`,
...eventFields
},
// TODO: Return proof only if requested.
proof: JSON.parse(event.proof)
};
} }
async getMethod (blockHash: string, contractAddress: string): Promise<ValueResult> { async getMethod (blockHash: string, contractAddress: string): Promise<ValueResult> {
@ -389,12 +335,12 @@ export class Indexer implements IndexerInterface {
const logDescription = contract.parseLog({ data, topics }); const logDescription = contract.parseLog({ data, topics });
const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription); const { eventName, eventInfo, eventSignature } = this._baseIndexer.parseEvent(logDescription);
return { return {
eventName, eventName,
eventInfo, eventInfo,
eventSignature: logDescription.signature eventSignature
}; };
} }
@ -522,8 +468,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned); return this._baseIndexer.getBlocksAtHeight(height, isPruned);
} }
async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> { async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); return this._baseIndexer.saveBlockAndFetchEvents(block, this._saveBlockAndFetchEvents.bind(this));
} }
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> { async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
@ -639,126 +585,33 @@ export class Indexer implements IndexerInterface {
}); });
} }
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> { async _saveBlockAndFetchEvents ({
cid: blockCid,
blockHash,
blockNumber,
blockTimestamp,
parentHash
}: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
assert(blockHash); assert(blockHash);
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
const blockPromise = this._ethClient.getBlockByHash(blockHash);
let logs: any[];
console.time('time:indexer#_fetchAndSaveEvents-fetch-logs'); const dbEvents = await this._baseIndexer.fetchEvents(blockHash, this.parseEventNameAndArgs.bind(this));
if (this._serverConfig.filterLogs) {
const watchedContracts = this._baseIndexer.getWatchedContracts();
const addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
const logsResult = await this._ethClient.getLogs({
blockHash,
addresses
});
logs = logsResult.logs;
} else {
({ logs } = await this._ethClient.getLogs({ blockHash }));
}
console.timeEnd('time:indexer#_fetchAndSaveEvents-fetch-logs');
let [
{ block },
{
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
]
}
}
] = await Promise.all([blockPromise, transactionsPromise]);
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;
return acc;
}, {});
const dbEvents: Array<DeepPartial<Event>> = [];
for (let li = 0; li < logs.length; li++) {
const logObj = logs[li];
const {
topics,
data,
index: logIndex,
cid,
ipldBlock,
account: {
address
},
transaction: {
hash: txHash
},
receiptCID,
status
} = logObj;
if (status) {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
const tx = transactionMap[txHash];
const extraInfo: { [key: string]: any } = { topics, data, tx };
const contract = ethers.utils.getAddress(address);
const watchedContract = await this.isWatchedContract(contract);
if (watchedContract) {
const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj);
eventName = eventDetails.eventName;
eventInfo = eventDetails.eventInfo;
extraInfo.eventSignature = eventDetails.eventSignature;
}
dbEvents.push({
index: logIndex,
txHash,
contract,
eventName,
eventInfo: JSONbigNative.stringify(eventInfo),
extraInfo: JSONbigNative.stringify(extraInfo),
proof: JSONbigNative.stringify({
data: JSONbigNative.stringify({
blockHash,
receiptCID,
log: {
cid,
ipldBlock
}
})
})
});
} else {
log(`Skipping event for receipt ${receiptCID} due to failed transaction.`);
}
}
const dbTx = await this._db.createTransactionRunner(); const dbTx = await this._db.createTransactionRunner();
try { try {
block = { const block = {
cid: blockCid, cid: blockCid,
blockHash, blockHash,
blockNumber: block.number, blockNumber,
blockTimestamp: block.timestamp, blockTimestamp,
parentHash: block.parent.hash parentHash
}; };
console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); console.time(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`);
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction(); await dbTx.commitTransaction();
console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); console.timeEnd(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`);
return blockProgress; return [blockProgress, []];
} catch (error) { } catch (error) {
await dbTx.rollbackTransaction(); await dbTx.rollbackTransaction();
throw error; throw error;

View File

@ -36,7 +36,6 @@
[upstream.ethServer] [upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql" gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081" rpcProviderEndpoint = "http://127.0.0.1:8081"
blockDelayInMilliSecs = 60000
[upstream.cache] [upstream.cache]
name = "requests" name = "requests"
@ -48,3 +47,6 @@
maxCompletionLagInSecs = 300 maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100 jobDelayInMilliSecs = 100
eventsInBatch = 50 eventsInBatch = 50
blockDelayInMilliSecs = 60000
prefetchBlocksInMem = true
prefetchBlockCount = 10

View File

@ -74,7 +74,7 @@ export const main = async (): Promise<any> => {
jobQueue, jobQueue,
indexer, indexer,
eventWatcher, eventWatcher,
config.upstream.ethServer.blockDelayInMilliSecs, jobQueueConfig.blockDelayInMilliSecs,
{ {
prefetch: true, prefetch: true,
startBlock: importData.snapshotBlock.blockNumber, startBlock: importData.snapshotBlock.blockNumber,

View File

@ -76,7 +76,7 @@ export const main = async (): Promise<any> => {
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
}; };
main().catch(err => { main().catch(err => {

View File

@ -5,9 +5,9 @@
import assert from 'assert'; import assert from 'assert';
import { utils } from 'ethers'; import { utils } from 'ethers';
// import { updateStateForMappingType, updateStateForElementaryType } from '@cerc-io/util'; import { ResultEvent } from '@cerc-io/util';
import { Indexer, KIND_PHISHERREGISTRY, ResultEvent } from './indexer'; import { Indexer, KIND_PHISHERREGISTRY } from './indexer';
const INVOKE_SIGNATURE = 'invoke(((((address,uint256,bytes),((address,bytes32,(address,bytes)[]),bytes)[])[],(uint256,uint256)),bytes)[])'; const INVOKE_SIGNATURE = 'invoke(((((address,uint256,bytes),((address,bytes32,(address,bytes)[]),bytes)[])[],(uint256,uint256)),bytes)[])';
const CLAIM_IF_MEMBER_SIGNATURE = 'claimIfMember(string,bool)'; const CLAIM_IF_MEMBER_SIGNATURE = 'claimIfMember(string,bool)';

View File

@ -16,7 +16,6 @@ import {
Indexer as BaseIndexer, Indexer as BaseIndexer,
IndexerInterface, IndexerInterface,
ValueResult, ValueResult,
UNKNOWN_EVENT_NAME,
ServerConfig, ServerConfig,
JobQueue, JobQueue,
Where, Where,
@ -26,7 +25,9 @@ import {
BlockHeight, BlockHeight,
StateKind, StateKind,
StateStatus, StateStatus,
getFullTransaction getFullTransaction,
ResultEvent,
getResultEvent
} from '@cerc-io/util'; } from '@cerc-io/util';
import PhisherRegistryArtifacts from './artifacts/PhisherRegistry.json'; import PhisherRegistryArtifacts from './artifacts/PhisherRegistry.json';
@ -49,30 +50,6 @@ const JSONbigNative = JSONbig({ useNativeBigInt: true });
export const KIND_PHISHERREGISTRY = 'PhisherRegistry'; export const KIND_PHISHERREGISTRY = 'PhisherRegistry';
export type ResultEvent = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
tx: {
hash: string;
from: string;
to: string;
index: number;
};
contract: string;
eventIndex: number;
eventSignature: string;
event: any;
proof: string;
};
export class Indexer implements IndexerInterface { export class Indexer implements IndexerInterface {
_db: Database _db: Database
_ethClient: EthClient _ethClient: EthClient
@ -124,38 +101,7 @@ export class Indexer implements IndexerInterface {
} }
getResultEvent (event: Event): ResultEvent { getResultEvent (event: Event): ResultEvent {
const block = event.block; return getResultEvent(event);
const eventFields = JSONbigNative.parse(event.eventInfo);
const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo);
return {
block: {
cid: block.cid,
hash: block.blockHash,
number: block.blockNumber,
timestamp: block.blockTimestamp,
parentHash: block.parentHash
},
tx: {
hash: event.txHash,
from: tx.src,
to: tx.dst,
index: tx.index
},
contract: event.contract,
eventIndex: event.index,
eventSignature,
event: {
__typename: `${event.eventName}Event`,
...eventFields
},
// TODO: Return proof only if requested.
proof: JSON.parse(event.proof)
};
} }
async multiNonce (blockHash: string, contractAddress: string, key0: string, key1: bigint, diff = false): Promise<ValueResult> { async multiNonce (blockHash: string, contractAddress: string, key0: string, key1: bigint, diff = false): Promise<ValueResult> {
@ -487,12 +433,12 @@ export class Indexer implements IndexerInterface {
const logDescription = contract.parseLog({ data, topics }); const logDescription = contract.parseLog({ data, topics });
const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription); const { eventName, eventInfo, eventSignature } = this._baseIndexer.parseEvent(logDescription);
return { return {
eventName, eventName,
eventInfo, eventInfo,
eventSignature: logDescription.signature eventSignature
}; };
} }
@ -620,8 +566,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned); return this._baseIndexer.getBlocksAtHeight(height, isPruned);
} }
async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> { async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); return this._baseIndexer.saveBlockAndFetchEvents(block, this._saveBlockAndFetchEvents.bind(this));
} }
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> { async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
@ -661,126 +607,33 @@ export class Indexer implements IndexerInterface {
return this._contractMap.get(kind); return this._contractMap.get(kind);
} }
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> { async _saveBlockAndFetchEvents ({
cid: blockCid,
blockHash,
blockNumber,
blockTimestamp,
parentHash
}: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
assert(blockHash); assert(blockHash);
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
const blockPromise = this._ethClient.getBlockByHash(blockHash);
let logs: any[];
console.time('time:indexer#_fetchAndSaveEvents-fetch-logs'); const dbEvents = await this._baseIndexer.fetchEvents(blockHash, this.parseEventNameAndArgs.bind(this));
if (this._serverConfig.filterLogs) {
const watchedContracts = this._baseIndexer.getWatchedContracts();
const addresses = watchedContracts.map((watchedContract): string => {
return watchedContract.address;
});
const logsResult = await this._ethClient.getLogs({
blockHash,
addresses
});
logs = logsResult.logs;
} else {
({ logs } = await this._ethClient.getLogs({ blockHash }));
}
console.timeEnd('time:indexer#_fetchAndSaveEvents-fetch-logs');
let [
{ block },
{
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
]
}
}
] = await Promise.all([blockPromise, transactionsPromise]);
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;
return acc;
}, {});
const dbEvents: Array<DeepPartial<Event>> = [];
for (let li = 0; li < logs.length; li++) {
const logObj = logs[li];
const {
topics,
data,
index: logIndex,
cid,
ipldBlock,
account: {
address
},
transaction: {
hash: txHash
},
receiptCID,
status
} = logObj;
if (status) {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
const tx = transactionMap[txHash];
const extraInfo: { [key: string]: any } = { topics, data, tx };
const contract = ethers.utils.getAddress(address);
const watchedContract = await this.isWatchedContract(contract);
if (watchedContract) {
const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj);
eventName = eventDetails.eventName;
eventInfo = eventDetails.eventInfo;
extraInfo.eventSignature = eventDetails.eventSignature;
}
dbEvents.push({
index: logIndex,
txHash,
contract,
eventName,
eventInfo: JSONbigNative.stringify(eventInfo),
extraInfo: JSONbigNative.stringify(extraInfo),
proof: JSONbigNative.stringify({
data: JSONbigNative.stringify({
blockHash,
receiptCID,
log: {
cid,
ipldBlock
}
})
})
});
} else {
log(`Skipping event for receipt ${receiptCID} due to failed transaction.`);
}
}
const dbTx = await this._db.createTransactionRunner(); const dbTx = await this._db.createTransactionRunner();
try { try {
block = { const block = {
cid: blockCid, cid: blockCid,
blockHash, blockHash,
blockNumber: block.number, blockNumber,
blockTimestamp: block.timestamp, blockTimestamp,
parentHash: block.parent.hash parentHash
}; };
console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); console.time(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`);
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction(); await dbTx.commitTransaction();
console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); console.timeEnd(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`);
return blockProgress; return [blockProgress, []];
} catch (error) { } catch (error) {
await dbTx.rollbackTransaction(); await dbTx.rollbackTransaction();
throw error; throw error;

View File

@ -21,76 +21,10 @@ const DEFAULT_EVENTS_IN_BATCH = 50;
const log = debug('vulcanize:common'); const log = debug('vulcanize:common');
export interface PrefetchedBlock { export interface PrefetchedBlock {
block: any; block: BlockProgressInterface;
events: DeepPartial<EventInterface>[]; events: DeepPartial<EventInterface>[];
} }
/**
* Method to fetch block by number and push to job queue.
* @param jobQueue
* @param indexer
* @param blockDelayInMilliSecs
* @param blockNumber
*/
export const processBlockByNumber = async (
jobQueue: JobQueue,
indexer: IndexerInterface,
blockDelayInMilliSecs: number,
blockNumber: number
): Promise<void> => {
log(`Process block ${blockNumber}`);
console.time('time:common#processBlockByNumber-get-blockProgress-syncStatus');
const [blockProgressEntities, syncStatus] = await Promise.all([
indexer.getBlocksAtHeight(blockNumber, false),
indexer.getSyncStatus()
]);
console.timeEnd('time:common#processBlockByNumber-get-blockProgress-syncStatus');
while (true) {
let blocks = blockProgressEntities.map((block: any) => {
block.timestamp = block.blockTimestamp;
return block;
});
if (!blocks.length) {
blocks = await indexer.getBlocks({ blockNumber });
}
if (blocks.length) {
for (let bi = 0; bi < blocks.length; bi++) {
const { cid, blockHash, blockNumber, parentHash, timestamp } = blocks[bi];
// Stop blocks already pushed to job queue. They are already retried after fail.
if (!syncStatus || syncStatus.chainHeadBlockNumber < blockNumber) {
await jobQueue.pushJob(
QUEUE_BLOCK_PROCESSING,
{
kind: JOB_KIND_INDEX,
blockNumber: Number(blockNumber),
cid,
blockHash,
parentHash,
timestamp
}
);
}
}
await indexer.updateSyncStatusChainHead(blocks[0].blockHash, Number(blocks[0].blockNumber));
return;
}
log(`No blocks fetched for block number ${blockNumber}, retrying after ${blockDelayInMilliSecs} ms delay.`);
await wait(blockDelayInMilliSecs);
}
};
/** /**
* Create a processing job in QUEUE_BLOCK_PROCESSING. * Create a processing job in QUEUE_BLOCK_PROCESSING.
* @param jobQueue * @param jobQueue
@ -166,7 +100,6 @@ export const fetchBlocksAtHeight = async (
if (!blocks.length) { if (!blocks.length) {
log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`); log(`No blocks fetched for block number ${blockNumber}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`);
assert(jobQueueConfig.blockDelayInMilliSecs);
await wait(jobQueueConfig.blockDelayInMilliSecs); await wait(jobQueueConfig.blockDelayInMilliSecs);
} }
} }
@ -205,8 +138,8 @@ export const _prefetchBlocks = async (
blockNumber + jobQueueConfig.prefetchBlockCount blockNumber + jobQueueConfig.prefetchBlockCount
); );
blocksWithEvents.forEach(({ block, events }) => { blocksWithEvents.forEach(({ blockProgress, events }) => {
prefetchedBlocksMap.set(block.blockHash, { block, events }); prefetchedBlocksMap.set(blockProgress.blockHash, { block: blockProgress, events });
}); });
}; };
@ -218,7 +151,7 @@ export const _prefetchBlocks = async (
* @param endBlock * @param endBlock
*/ */
export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfig: JobQueueConfig, startBlock: number, endBlock: number): Promise<any[]> => { export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfig: JobQueueConfig, startBlock: number, endBlock: number): Promise<any[]> => {
let blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock); const blockNumbers = [...Array(endBlock - startBlock).keys()].map(n => n + startBlock);
let blocks = []; let blocks = [];
// Fetch blocks again if there are missing blocks. // Fetch blocks again if there are missing blocks.
@ -228,20 +161,17 @@ export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfi
const res = await Promise.all(blockPromises); const res = await Promise.all(blockPromises);
console.timeEnd('time:common#fetchBatchBlocks-getBlocks'); console.timeEnd('time:common#fetchBatchBlocks-getBlocks');
const missingIndex = res.findIndex(blocks => blocks.length === 0); const firstMissingBlockIndex = res.findIndex(blocks => blocks.length === 0);
// TODO Continue to process available blocks instead of retrying for whole range. if (firstMissingBlockIndex === -1) {
if (missingIndex < 0) { blocks = res;
blocks = blocks.concat(res); break;
} else if (firstMissingBlockIndex > 0) {
blocks = res.slice(0, firstMissingBlockIndex);
break; break;
} }
log('missing block number:', blockNumbers[missingIndex]); log(`No blocks fetched for block number ${blockNumbers[0]}, retrying after ${jobQueueConfig.blockDelayInMilliSecs} ms delay.`);
blocks.push(res.slice(0, missingIndex));
blockNumbers = blockNumbers.slice(missingIndex);
assert(jobQueueConfig.blockDelayInMilliSecs);
await wait(jobQueueConfig.blockDelayInMilliSecs); await wait(jobQueueConfig.blockDelayInMilliSecs);
} }
@ -254,11 +184,9 @@ export const _fetchBatchBlocks = async (indexer: IndexerInterface, jobQueueConfi
// TODO Catch errors and continue to process available events instead of retrying for whole range because of an error. // TODO Catch errors and continue to process available events instead of retrying for whole range because of an error.
const blockAndEventPromises = blocks.map(async block => { const blockAndEventPromises = blocks.map(async block => {
block.blockTimestamp = block.timestamp; block.blockTimestamp = block.timestamp;
const [blockProgress, events] = await indexer.saveBlockAndFetchEvents(block);
assert(indexer.fetchBlockEvents); return { blockProgress, events };
const events = await indexer.fetchBlockEvents(block);
return { block, events };
}); });
return Promise.all(blockAndEventPromises); return Promise.all(blockAndEventPromises);
@ -326,7 +254,7 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
// uni-info-watcher indexer doesn't have watched contracts implementation. // uni-info-watcher indexer doesn't have watched contracts implementation.
watchedContract = true; watchedContract = true;
} else { } else {
watchedContract = await indexer.isWatchedContract(event.contract); watchedContract = indexer.isWatchedContract(event.contract);
} }
if (watchedContract) { if (watchedContract) {

View File

@ -24,7 +24,7 @@ export interface JobQueueConfig {
eventsInBatch: number; eventsInBatch: number;
lazyUpdateBlockProgress?: boolean; lazyUpdateBlockProgress?: boolean;
subgraphEventsOrder: boolean; subgraphEventsOrder: boolean;
blockDelayInMilliSecs?: number; blockDelayInMilliSecs: number;
prefetchBlocksInMem: boolean; prefetchBlocksInMem: boolean;
prefetchBlockCount: number; prefetchBlockCount: number;
} }
@ -49,7 +49,6 @@ export interface UpstreamConfig {
ethServer: { ethServer: {
gqlApiEndpoint: string; gqlApiEndpoint: string;
rpcProviderEndpoint: string; rpcProviderEndpoint: string;
blockDelayInMilliSecs: number;
} }
traceProviderEndpoint: string; traceProviderEndpoint: string;
uniWatcher: { uniWatcher: {

View File

@ -11,7 +11,7 @@ import { EthClient } from '@cerc-io/ipld-eth-client';
import { JobQueue } from './job-queue'; import { JobQueue } from './job-queue';
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types'; import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants'; import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME } from './constants';
import { createPruningJob, processBlockByNumber } from './common'; import { createPruningJob, processBlockByNumberWithCache } from './common';
import { UpstreamConfig } from './config'; import { UpstreamConfig } from './config';
import { OrderDirection } from './database'; import { OrderDirection } from './database';
@ -58,10 +58,8 @@ export class EventWatcher {
startBlockNumber = syncStatus.chainHeadBlockNumber + 1; startBlockNumber = syncStatus.chainHeadBlockNumber + 1;
} }
const { ethServer: { blockDelayInMilliSecs } } = this._upstreamConfig;
// Wait for block processing as blockProgress event might process the same block. // Wait for block processing as blockProgress event might process the same block.
await processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, startBlockNumber); await processBlockByNumberWithCache(this._jobQueue, startBlockNumber);
// Creating an AsyncIterable from AsyncIterator to iterate over the values. // Creating an AsyncIterable from AsyncIterator to iterate over the values.
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
@ -76,7 +74,7 @@ export class EventWatcher {
const { onBlockProgressEvent: { blockNumber, isComplete } } = data; const { onBlockProgressEvent: { blockNumber, isComplete } } = data;
if (isComplete) { if (isComplete) {
await processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, blockNumber + 1); await processBlockByNumberWithCache(this._jobQueue, blockNumber + 1);
} }
} }
} }
@ -139,23 +137,22 @@ export class EventWatcher {
} }
async _handleIndexingComplete (jobData: any): Promise<void> { async _handleIndexingComplete (jobData: any): Promise<void> {
const { blockHash, blockNumber, priority } = jobData; const { blockNumber, priority } = jobData;
const [blockProgress, syncStatus] = await Promise.all([ const blockProgressEntities = await this._indexer.getBlocksAtHeight(Number(blockNumber), false);
this._indexer.getBlockProgress(blockHash),
// Update sync progress.
this._indexer.updateSyncStatusIndexedBlock(blockHash, blockNumber)
]);
if (blockProgress) { // Log a warning and return if block entries not found.
log(`Job onComplete indexing block ${blockHash} ${blockNumber}`); if (blockProgressEntities.length === 0) {
log(`block not indexed at height ${blockNumber}`);
return;
}
// Create pruning job if required. const syncStatus = await this._indexer.updateSyncStatusIndexedBlock(blockProgressEntities[0].blockHash, Number(blockNumber));
if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) { log(`Job onComplete indexing block ${blockProgressEntities[0].blockHash} ${blockNumber}`);
await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
} // Create pruning job if required.
} else { if (syncStatus && syncStatus.latestIndexedBlockNumber > (syncStatus.latestCanonicalBlockNumber + MAX_REORG_DEPTH)) {
log(`block not indexed for ${blockHash} ${blockNumber}`); await createPruningJob(this._jobQueue, syncStatus.latestCanonicalBlockNumber, priority);
} }
} }

View File

@ -7,7 +7,7 @@ import debug from 'debug';
import { JobQueue } from './job-queue'; import { JobQueue } from './job-queue';
import { EventWatcherInterface, IndexerInterface } from './types'; import { EventWatcherInterface, IndexerInterface } from './types';
import { wait } from './misc'; import { wait } from './misc';
import { processBlockByNumber } from './common'; import { processBlockByNumberWithCache } from './common';
const log = debug('vulcanize:fill'); const log = debug('vulcanize:fill');
@ -59,7 +59,7 @@ export const fillBlocks = async (
const numberOfBlocks = endBlock - startBlock + 1; const numberOfBlocks = endBlock - startBlock + 1;
processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, startBlock); processBlockByNumberWithCache(jobQueue, startBlock);
// Creating an AsyncIterable from AsyncIterator to iterate over the values. // Creating an AsyncIterable from AsyncIterator to iterate over the values.
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of // https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
@ -80,7 +80,7 @@ export const fillBlocks = async (
const completePercentage = Math.round(blocksProcessed / numberOfBlocks * 100); const completePercentage = Math.round(blocksProcessed / numberOfBlocks * 100);
log(`Processed ${blocksProcessed} of ${numberOfBlocks} blocks (${completePercentage}%)`); log(`Processed ${blocksProcessed} of ${numberOfBlocks} blocks (${completePercentage}%)`);
await processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, blockNumber + 1); await processBlockByNumberWithCache(jobQueue, blockNumber + 1);
if (blockNumber + 1 >= endBlock) { if (blockNumber + 1 >= endBlock) {
// Break the async loop when blockProgress event is for the endBlock and processing is complete. // Break the async loop when blockProgress event is for the endBlock and processing is complete.
@ -130,7 +130,7 @@ const prefetchBlocks = async (
const blockProgress = await indexer.getBlockProgress(blockHash); const blockProgress = await indexer.getBlockProgress(blockHash);
if (!blockProgress) { if (!blockProgress) {
await indexer.fetchBlockWithEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); await indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
} }
}); });

View File

@ -36,7 +36,7 @@ export const indexBlock = async (
// Check if blockProgress fetched from database. // Check if blockProgress fetched from database.
if (!partialblockProgress.id) { if (!partialblockProgress.id) {
blockProgress = await indexer.fetchBlockWithEvents(partialblockProgress); [blockProgress] = await indexer.saveBlockAndFetchEvents(partialblockProgress);
} else { } else {
blockProgress = partialblockProgress as BlockProgressInterface; blockProgress = partialblockProgress as BlockProgressInterface;
} }

View File

@ -5,6 +5,7 @@
import assert from 'assert'; import assert from 'assert';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import debug from 'debug'; import debug from 'debug';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers'; import { ethers } from 'ethers';
import _ from 'lodash'; import _ from 'lodash';
import { sha256 } from 'multiformats/hashes/sha2'; import { sha256 } from 'multiformats/hashes/sha2';
@ -32,6 +33,7 @@ import { ServerConfig } from './config';
const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000; const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000;
const log = debug('vulcanize:indexer'); const log = debug('vulcanize:indexer');
const JSONbigNative = JSONbig({ useNativeBigInt: true });
export interface ValueResult { export interface ValueResult {
value: any; value: any;
@ -62,6 +64,30 @@ export type ResultState = {
data: string; data: string;
}; };
export type ResultEvent = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
tx: {
hash: string;
from: string;
to: string;
index: number;
};
contract: string;
eventIndex: number;
eventSignature: string;
event: any;
proof: string;
};
export class Indexer { export class Indexer {
_serverConfig: ServerConfig; _serverConfig: ServerConfig;
_db: DatabaseInterface; _db: DatabaseInterface;
@ -237,26 +263,115 @@ export class Indexer {
return this._db.getEvent(id); return this._db.getEvent(id);
} }
async fetchBlockWithEvents (block: DeepPartial<BlockProgressInterface>, fetchAndSaveEvents: (block: DeepPartial<BlockProgressInterface>) => Promise<BlockProgressInterface>): Promise<BlockProgressInterface> { async saveBlockAndFetchEvents (block: DeepPartial<BlockProgressInterface>, saveBlockAndFetchEvents: (block: DeepPartial<BlockProgressInterface>) => Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]>): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]> {
assert(block.blockHash); assert(block.blockHash);
log(`getBlockEvents: fetching from upstream server ${block.blockHash}`); log(`getBlockEvents: fetching from upstream server ${block.blockHash}`);
const blockProgress = await fetchAndSaveEvents(block); const [blockProgress, events] = await saveBlockAndFetchEvents(block);
log(`getBlockEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`); log(`getBlockEvents: fetched for block: ${blockProgress.blockHash} num events: ${blockProgress.numEvents}`);
return blockProgress; return [blockProgress, events];
} }
async fetchBlockEvents (block: DeepPartial<BlockProgressInterface>, fetchEvents: (block: DeepPartial<BlockProgressInterface>) => Promise<DeepPartial<EventInterface>[]>): Promise<DeepPartial<EventInterface>[]> { async fetchEvents (blockHash: string, parseEventNameAndArgs: (kind: string, logObj: any) => any): Promise<DeepPartial<EventInterface>[]> {
assert(block.blockHash); let logsPromise: Promise<any>;
log(`getBlockEvents: fetching from upstream server ${block.blockHash}`); if (this._serverConfig.filterLogs) {
console.time(`time:indexer#fetchBlockEvents-fetchAndSaveEvents-${block.blockHash}`); const watchedContracts = this.getWatchedContracts();
const events = await fetchEvents(block); const addresses = watchedContracts.map((watchedContract): string => {
console.timeEnd(`time:indexer#fetchBlockEvents-fetchAndSaveEvents-${block.blockHash}`); return watchedContract.address;
log(`getBlockEvents: fetched for block: ${block.blockHash} num events: ${events.length}`); });
return events; logsPromise = this._ethClient.getLogs({
blockHash,
addresses
});
} else {
logsPromise = this._ethClient.getLogs({ blockHash });
}
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
const [
{ logs },
{
allEthHeaderCids: {
nodes: [
{
ethTransactionCidsByHeaderId: {
nodes: transactions
}
}
]
}
}
] = await Promise.all([logsPromise, transactionsPromise]);
const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;
return acc;
}, {});
const dbEvents: Array<DeepPartial<EventInterface>> = [];
for (let li = 0; li < logs.length; li++) {
const logObj = logs[li];
const {
topics,
data,
index: logIndex,
cid,
ipldBlock,
account: {
address
},
transaction: {
hash: txHash
},
receiptCID,
status
} = logObj;
if (status) {
let eventName = UNKNOWN_EVENT_NAME;
let eventInfo = {};
const tx = transactionMap[txHash];
const extraInfo: { [key: string]: any } = { topics, data, tx };
const contract = ethers.utils.getAddress(address);
const watchedContract = this.isWatchedContract(contract);
if (watchedContract) {
const eventDetails = parseEventNameAndArgs(watchedContract.kind, logObj);
eventName = eventDetails.eventName;
eventInfo = eventDetails.eventInfo;
extraInfo.eventSignature = eventDetails.eventSignature;
}
dbEvents.push({
index: logIndex,
txHash,
contract,
eventName,
eventInfo: JSONbigNative.stringify(eventInfo),
extraInfo: JSONbigNative.stringify(extraInfo),
proof: JSONbigNative.stringify({
data: JSONbigNative.stringify({
blockHash,
receiptCID,
log: {
cid,
ipldBlock
}
})
})
});
} else {
log(`Skipping event for receipt ${receiptCID} due to failed transaction.`);
}
}
return dbEvents;
} }
async saveBlockProgress (block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface> { async saveBlockProgress (block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface> {
@ -872,9 +987,7 @@ export class Indexer {
this._stateStatusMap[address] = _.merge(oldStateStatus, stateStatus); this._stateStatusMap[address] = _.merge(oldStateStatus, stateStatus);
} }
parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: any } { parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: any, eventSignature: string } {
const eventName = logDescription.name;
const eventInfo = logDescription.eventFragment.inputs.reduce((acc: any, input, index) => { const eventInfo = logDescription.eventFragment.inputs.reduce((acc: any, input, index) => {
acc[input.name] = this._parseLogArg(input, logDescription.args[index]); acc[input.name] = this._parseLogArg(input, logDescription.args[index]);
@ -882,8 +995,9 @@ export class Indexer {
}, {}); }, {});
return { return {
eventName, eventName: logDescription.name,
eventInfo eventInfo,
eventSignature: logDescription.signature
}; };
} }

View File

@ -17,13 +17,15 @@ import {
QUEUE_EVENT_PROCESSING QUEUE_EVENT_PROCESSING
} from './constants'; } from './constants';
import { JobQueue } from './job-queue'; import { JobQueue } from './job-queue';
import { EventInterface, IndexerInterface, SyncStatusInterface } from './types'; import { EventInterface, IndexerInterface } from './types';
import { wait } from './misc'; import { wait } from './misc';
import { import {
createPruningJob, createPruningJob,
createHooksJob, createHooksJob,
createCheckpointJob, createCheckpointJob,
processBatchEvents processBatchEvents,
PrefetchedBlock,
fetchBlocksAtHeight
} from './common'; } from './common';
import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics'; import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics';
@ -37,6 +39,7 @@ export class JobRunner {
_endBlockProcessTimer?: () => void _endBlockProcessTimer?: () => void
_shutDown = false _shutDown = false
_signalCount = 0 _signalCount = 0
_prefetchedBlocksMap: Map<string, PrefetchedBlock> = new Map()
constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) { constructor (jobQueueConfig: JobQueueConfig, indexer: IndexerInterface, jobQueue: JobQueue) {
this._indexer = indexer; this._indexer = indexer;
@ -47,16 +50,21 @@ export class JobRunner {
async processBlock (job: any): Promise<void> { async processBlock (job: any): Promise<void> {
const { data: { kind } } = job; const { data: { kind } } = job;
const syncStatus = await this._indexer.getSyncStatus();
assert(syncStatus);
switch (kind) { switch (kind) {
case JOB_KIND_INDEX: case JOB_KIND_INDEX: {
await this._indexBlock(job, syncStatus); const blocksToBeIndexed = await fetchBlocksAtHeight(
job,
this._indexer,
this._jobQueueConfig,
this._prefetchedBlocksMap
);
const indexBlockPromises = blocksToBeIndexed.map(blockToBeIndexed => this._indexBlock(job, blockToBeIndexed));
await Promise.all(indexBlockPromises);
break; break;
}
case JOB_KIND_PRUNE: { case JOB_KIND_PRUNE: {
await this._pruneChain(job, syncStatus); await this._pruneChain(job);
// Create a hooks job for parent block of latestCanonicalBlock pruning for first block is skipped as it is assumed to be a canonical block. // Create a hooks job for parent block of latestCanonicalBlock pruning for first block is skipped as it is assumed to be a canonical block.
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock(); const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
@ -180,8 +188,12 @@ export class JobRunner {
} }
} }
async _pruneChain (job: any, syncStatus: SyncStatusInterface): Promise<void> { async _pruneChain (job: any): Promise<void> {
console.time('time:job-runner#_pruneChain'); console.time('time:job-runner#_pruneChain');
const syncStatus = await this._indexer.getSyncStatus();
assert(syncStatus);
const { pruneBlockHeight } = job.data; const { pruneBlockHeight } = job.data;
log(`Processing chain pruning at ${pruneBlockHeight}`); log(`Processing chain pruning at ${pruneBlockHeight}`);
@ -226,8 +238,12 @@ export class JobRunner {
console.timeEnd('time:job-runner#_pruneChain'); console.timeEnd('time:job-runner#_pruneChain');
} }
async _indexBlock (job: any, syncStatus: SyncStatusInterface): Promise<void> { async _indexBlock (job: any, blockToBeIndexed: any): Promise<void> {
const { data: { cid, blockHash, blockNumber, parentHash, priority, timestamp } } = job; const syncStatus = await this._indexer.getSyncStatus();
assert(syncStatus);
const { data: { priority } } = job;
const { cid, blockHash, blockNumber, parentHash, blockTimestamp } = blockToBeIndexed;
const indexBlockStartTime = new Date(); const indexBlockStartTime = new Date();
@ -325,13 +341,21 @@ export class JobRunner {
} }
if (!blockProgress) { if (!blockProgress) {
const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig; const prefetchedBlock = this._prefetchedBlocksMap.get(blockHash);
// Delay required to process block. if (prefetchedBlock) {
await wait(jobDelayInMilliSecs); ({ block: blockProgress } = prefetchedBlock);
console.time('time:job-runner#_indexBlock-fetch-block-events'); } else {
blockProgress = await this._indexer.fetchBlockWithEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); // Delay required to process block.
console.timeEnd('time:job-runner#_indexBlock-fetch-block-events'); const { jobDelayInMilliSecs = 0 } = this._jobQueueConfig;
await wait(jobDelayInMilliSecs);
console.time('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
[blockProgress] = await this._indexer.saveBlockAndFetchEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp });
console.timeEnd('time:job-runner#_indexBlock-saveBlockAndFetchEvents');
this._prefetchedBlocksMap.set(blockHash, { block: blockProgress, events: [] });
}
} }
await this._indexer.processBlock(blockProgress); await this._indexer.processBlock(blockProgress);
@ -347,21 +371,30 @@ export class JobRunner {
async _processEvents (job: any): Promise<void> { async _processEvents (job: any): Promise<void> {
const { blockHash } = job.data; const { blockHash } = job.data;
console.time('time:job-runner#_processEvents-get-block-progress'); if (!this._prefetchedBlocksMap.has(blockHash)) {
const block = await this._indexer.getBlockProgress(blockHash); console.time('time:job-runner#_processEvents-get-block-progress');
console.timeEnd('time:job-runner#_processEvents-get-block-progress'); const block = await this._indexer.getBlockProgress(blockHash);
assert(block); console.timeEnd('time:job-runner#_processEvents-get-block-progress');
assert(block);
this._prefetchedBlocksMap.set(blockHash, { block, events: [] });
}
const prefetchedBlock = this._prefetchedBlocksMap.get(blockHash);
assert(prefetchedBlock);
const { block } = prefetchedBlock;
console.time('time:job-runner#_processEvents-events'); console.time('time:job-runner#_processEvents-events');
await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch); await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch);
console.timeEnd('time:job-runner#_processEvents-events'); console.timeEnd('time:job-runner#_processEvents-events');
// Update metrics // Update metrics
lastProcessedBlockNumber.set(block.blockNumber); lastProcessedBlockNumber.set(block.blockNumber);
lastBlockNumEvents.set(block.numEvents); lastBlockNumEvents.set(block.numEvents);
this._prefetchedBlocksMap.delete(block.blockHash);
if (this._endBlockProcessTimer) { if (this._endBlockProcessTimer) {
this._endBlockProcessTimer(); this._endBlockProcessTimer();
} }

View File

@ -7,8 +7,8 @@ import { ValueTransformer } from 'typeorm';
import yargs from 'yargs'; import yargs from 'yargs';
import { hideBin } from 'yargs/helpers'; import { hideBin } from 'yargs/helpers';
import { utils, providers } from 'ethers'; import { utils, providers } from 'ethers';
import JSONbig from 'json-bigint';
import Decimal from 'decimal.js'; import Decimal from 'decimal.js';
import debug from 'debug';
import { EthClient } from '@cerc-io/ipld-eth-client'; import { EthClient } from '@cerc-io/ipld-eth-client';
@ -18,6 +18,10 @@ import { JobQueue } from './job-queue';
import { GraphDecimal } from './graph-decimal'; import { GraphDecimal } from './graph-decimal';
import * as EthDecoder from './eth'; import * as EthDecoder from './eth';
import { getCachedBlockSize } from './block-size-cache'; import { getCachedBlockSize } from './block-size-cache';
import { ResultEvent } from './indexer';
import { EventInterface } from './types';
const JSONbigNative = JSONbig({ useNativeBigInt: true });
/** /**
* Method to wait for specified time. * Method to wait for specified time.
@ -248,3 +252,38 @@ export const jsonBigIntStringReplacer = (_: string, value: any): any => {
return value; return value;
}; };
export const getResultEvent = (event: EventInterface): ResultEvent => {
const block = event.block;
const eventFields = JSONbigNative.parse(event.eventInfo);
const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo);
return {
block: {
cid: block.cid,
hash: block.blockHash,
number: block.blockNumber,
timestamp: block.blockTimestamp,
parentHash: block.parentHash
},
tx: {
hash: event.txHash,
from: tx.src,
to: tx.dst,
index: tx.index
},
contract: event.contract,
eventIndex: event.index,
eventSignature,
event: {
__typename: `${event.eventName}Event`,
...eventFields
},
// TODO: Return proof only if requested.
proof: JSON.parse(event.proof)
};
};

View File

@ -91,8 +91,7 @@ export interface IndexerInterface {
getLatestCanonicalBlock (): Promise<BlockProgressInterface> getLatestCanonicalBlock (): Promise<BlockProgressInterface>
getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>> getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string> getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
fetchBlockWithEvents (block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface> saveBlockAndFetchEvents (block: DeepPartial<BlockProgressInterface>): Promise<[BlockProgressInterface, DeepPartial<EventInterface>[]]>
fetchBlockEvents?: (block: DeepPartial<BlockProgressInterface>) => Promise<DeepPartial<EventInterface>[]>
removeUnknownEvents (block: BlockProgressInterface): Promise<void> removeUnknownEvents (block: BlockProgressInterface): Promise<void>
updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface> updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface> updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>