Support events handlers in multiple data sources for a contract address (#526)

* Support processing events in multiple subgraph datasources for a single contract address

* Fix parsing event topic in graph-node watcher

* Update codegen templates

* Fix dummy indexer method in graph-node test

* Upgrade package versions to 0.2.102
This commit is contained in:
Nabarun Gogoi 2024-06-26 17:56:37 +05:30 committed by GitHub
parent b9a899aec1
commit 2217cd3ffb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 208 additions and 153 deletions

View File

@ -2,7 +2,7 @@
"packages": [
"packages/*"
],
"version": "0.2.101",
"version": "0.2.102",
"npmClient": "yarn",
"useWorkspaces": true,
"command": {

View File

@ -1,6 +1,6 @@
{
"name": "@cerc-io/cache",
"version": "0.2.101",
"version": "0.2.102",
"description": "Generic object cache",
"main": "dist/index.js",
"scripts": {

View File

@ -1,6 +1,6 @@
{
"name": "@cerc-io/cli",
"version": "0.2.101",
"version": "0.2.102",
"main": "dist/index.js",
"license": "AGPL-3.0",
"scripts": {
@ -15,13 +15,13 @@
},
"dependencies": {
"@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.101",
"@cerc-io/ipld-eth-client": "^0.2.101",
"@cerc-io/cache": "^0.2.102",
"@cerc-io/ipld-eth-client": "^0.2.102",
"@cerc-io/libp2p": "^0.42.2-laconic-0.1.4",
"@cerc-io/nitro-node": "^0.1.15",
"@cerc-io/peer": "^0.2.101",
"@cerc-io/rpc-eth-client": "^0.2.101",
"@cerc-io/util": "^0.2.101",
"@cerc-io/peer": "^0.2.102",
"@cerc-io/rpc-eth-client": "^0.2.102",
"@cerc-io/util": "^0.2.102",
"@ethersproject/providers": "^5.4.4",
"@graphql-tools/utils": "^9.1.1",
"@ipld/dag-cbor": "^8.0.0",

View File

@ -1,6 +1,6 @@
{
"name": "@cerc-io/codegen",
"version": "0.2.101",
"version": "0.2.102",
"description": "Code generator",
"private": true,
"main": "index.js",
@ -20,7 +20,7 @@
},
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@cerc-io/util": "^0.2.101",
"@cerc-io/util": "^0.2.102",
"@graphql-tools/load-files": "^6.5.2",
"@npmcli/package-json": "^5.0.0",
"@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git",

View File

@ -2,6 +2,7 @@ className: Contract
indexOn:
- columns:
- address
- kind
unique: true
columns:
- name: id

View File

@ -525,23 +525,27 @@ export class Indexer implements IndexerInterface {
}
{{/if}}
parseEventNameAndArgs (kind: string, logObj: any): { eventParsed: boolean, eventDetails: any } {
parseEventNameAndArgs (watchedContracts: Contract[], logObj: any): { eventParsed: boolean, eventDetails: any } {
const { topics, data } = logObj;
let logDescription: ethers.utils.LogDescription | undefined;
const contract = this._contractMap.get(kind);
for (const watchedContract of watchedContracts) {
const contract = this._contractMap.get(watchedContract.kind);
assert(contract);
let logDescription: ethers.utils.LogDescription;
try {
logDescription = contract.parseLog({ data, topics });
break;
} catch (err) {
// Return if no matching event found
if ((err as Error).message.includes('no matching event')) {
log(`WARNING: Skipping event for contract ${kind} as no matching event found in the ABI`);
return { eventParsed: false, eventDetails: {} };
// Continue loop only if no matching event found
if (!((err as Error).message.includes('no matching event'))) {
throw err;
}
}
}
throw err;
if (!logDescription) {
return { eventParsed: false, eventDetails: {} };
}
const { eventName, eventInfo, eventSignature } = this._baseIndexer.parseEvent(logDescription);
@ -647,8 +651,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
isWatchedContract (address : string): Contract | undefined {
return this._baseIndexer.isWatchedContract(address);
isContractAddressWatched (address : string): Contract[] | undefined {
return this._baseIndexer.isContractAddressWatched(address);
}
getWatchedContracts (): Contract[] {

View File

@ -41,12 +41,12 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@cerc-io/cli": "^0.2.101",
"@cerc-io/ipld-eth-client": "^0.2.101",
"@cerc-io/solidity-mapper": "^0.2.101",
"@cerc-io/util": "^0.2.101",
"@cerc-io/cli": "^0.2.102",
"@cerc-io/ipld-eth-client": "^0.2.102",
"@cerc-io/solidity-mapper": "^0.2.102",
"@cerc-io/util": "^0.2.102",
{{#if (subgraphPath)}}
"@cerc-io/graph-node": "^0.2.101",
"@cerc-io/graph-node": "^0.2.102",
{{/if}}
"@ethersproject/providers": "^5.4.4",
"debug": "^4.3.1",

View File

@ -1,10 +1,10 @@
{
"name": "@cerc-io/graph-node",
"version": "0.2.101",
"version": "0.2.102",
"main": "dist/index.js",
"license": "AGPL-3.0",
"devDependencies": {
"@cerc-io/solidity-mapper": "^0.2.101",
"@cerc-io/solidity-mapper": "^0.2.102",
"@ethersproject/providers": "^5.4.4",
"@graphprotocol/graph-ts": "^0.22.0",
"@nomiclabs/hardhat-ethers": "^2.0.2",
@ -51,9 +51,9 @@
"dependencies": {
"@apollo/client": "^3.3.19",
"@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2",
"@cerc-io/cache": "^0.2.101",
"@cerc-io/ipld-eth-client": "^0.2.101",
"@cerc-io/util": "^0.2.101",
"@cerc-io/cache": "^0.2.102",
"@cerc-io/ipld-eth-client": "^0.2.102",
"@cerc-io/util": "^0.2.102",
"@types/json-diff": "^0.5.2",
"@types/yargs": "^17.0.0",
"bn.js": "^4.11.9",

View File

@ -50,6 +50,7 @@ export interface Context {
rpcSupportsBlockHashParam: boolean;
block?: Block;
contractAddress?: string;
dataSourceName?: string;
}
const log = debug('vulcanize:graph-node');
@ -719,13 +720,14 @@ export const instantiate = async (
},
'dataSource.context': async () => {
assert(context.contractAddress);
const contract = indexer.isWatchedContract(context.contractAddress);
const watchedContracts = indexer.isContractAddressWatched(context.contractAddress);
const dataSourceContract = watchedContracts?.find(contract => contract.kind === context.dataSourceName);
if (!contract) {
if (!dataSourceContract) {
return null;
}
return database.toGraphContext(instanceExports, contract.context);
return database.toGraphContext(instanceExports, dataSourceContract.context);
},
'dataSource.network': async () => {
assert(dataSource);

View File

@ -169,7 +169,6 @@ export class GraphWatcher {
async addContracts () {
assert(this._indexer);
assert(this._indexer.watchContract);
assert(this._indexer.isWatchedContract);
// Watching the contract(s) if not watched already.
for (const dataSource of this._dataSources) {
@ -177,7 +176,7 @@ export class GraphWatcher {
// Skip for templates as they are added dynamically.
if (address) {
const watchedContract = await this._indexer.isWatchedContract(address);
const watchedContract = this._indexer.isContractAddressWatched(address);
if (!watchedContract) {
await this._indexer.watchContract(address, name, true, startBlock);
@ -197,26 +196,40 @@ export class GraphWatcher {
const blockData = this._context.block;
assert(blockData);
assert(this._indexer && this._indexer.isWatchedContract);
const watchedContract = this._indexer.isWatchedContract(contract);
assert(watchedContract);
assert(this._indexer);
const watchedContracts = this._indexer.isContractAddressWatched(contract);
assert(watchedContracts);
// Get dataSource in subgraph yaml based on contract address.
const dataSource = this._dataSources.find(dataSource => dataSource.name === watchedContract.kind);
// Get dataSources in subgraph yaml based on contract kind (same as dataSource.name)
const dataSources = this._dataSources
.filter(dataSource => watchedContracts.some(contract => contract.kind === dataSource.name));
if (!dataSource) {
if (!dataSources.length) {
log(`Subgraph doesn't have configuration for contract ${contract}`);
return;
}
for (const dataSource of dataSources) {
this._context.contractAddress = contract;
this._context.dataSourceName = dataSource.name;
const { instance, contractInterface } = this._dataSourceMap[watchedContract.kind];
const { instance, contractInterface } = this._dataSourceMap[dataSource.name];
assert(instance);
const { exports: instanceExports } = instance;
let eventTopic: string;
try {
eventTopic = contractInterface.getEventTopic(eventSignature);
} catch (err) {
// Continue loop only if no matching event found
if (!((err as Error).message.includes('no matching event'))) {
throw err;
}
continue;
}
// Get event handler based on event topic (from event signature).
const eventTopic = contractInterface.getEventTopic(eventSignature);
const eventHandler = dataSource.mapping.eventHandlers.find((eventHandler: any) => {
// The event signature we get from logDescription is different than that given in the subgraph yaml file.
// For eg. event in subgraph.yaml: Stake(indexed address,uint256); from logDescription: Stake(address,uint256)
@ -257,6 +270,7 @@ export class GraphWatcher {
throw error;
}
}
}
async handleBlock (blockHash: string, blockNumber: number, extraData: ExtraEventData) {
// Clear transactions map on handling new block.
@ -311,6 +325,7 @@ export class GraphWatcher {
for (const contractAddress of contractAddressList) {
this._context.contractAddress = contractAddress;
this._context.dataSourceName = dataSource.name;
// Call all the block handlers one after another for a contract.
const blockHandlerPromises = dataSource.mapping.blockHandlers.map(async (blockHandler: any): Promise<void> => {

View File

@ -248,7 +248,7 @@ export class Indexer implements IndexerInterface {
return undefined;
}
isWatchedContract (address : string): ContractInterface | undefined {
isContractAddressWatched (address : string): ContractInterface[] | undefined {
return undefined;
}

View File

@ -1,6 +1,6 @@
{
"name": "@cerc-io/ipld-eth-client",
"version": "0.2.101",
"version": "0.2.102",
"description": "IPLD ETH Client",
"main": "dist/index.js",
"scripts": {
@ -20,8 +20,8 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.101",
"@cerc-io/util": "^0.2.101",
"@cerc-io/cache": "^0.2.102",
"@cerc-io/util": "^0.2.102",
"cross-fetch": "^3.1.4",
"debug": "^4.3.1",
"ethers": "^5.4.4",

View File

@ -1,6 +1,6 @@
{
"name": "@cerc-io/peer",
"version": "0.2.101",
"version": "0.2.102",
"description": "libp2p module",
"main": "dist/index.js",
"exports": "./dist/index.js",

View File

@ -1,6 +1,6 @@
{
"name": "@cerc-io/rpc-eth-client",
"version": "0.2.101",
"version": "0.2.102",
"description": "RPC ETH Client",
"main": "dist/index.js",
"scripts": {
@ -19,9 +19,9 @@
},
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@cerc-io/cache": "^0.2.101",
"@cerc-io/ipld-eth-client": "^0.2.101",
"@cerc-io/util": "^0.2.101",
"@cerc-io/cache": "^0.2.102",
"@cerc-io/ipld-eth-client": "^0.2.102",
"@cerc-io/util": "^0.2.102",
"chai": "^4.3.4",
"ethers": "^5.4.4",
"left-pad": "^1.3.0",

View File

@ -1,6 +1,6 @@
{
"name": "@cerc-io/solidity-mapper",
"version": "0.2.101",
"version": "0.2.102",
"main": "dist/index.js",
"license": "AGPL-3.0",
"devDependencies": {

View File

@ -1,6 +1,6 @@
{
"name": "@cerc-io/test",
"version": "0.2.101",
"version": "0.2.102",
"main": "dist/index.js",
"license": "AGPL-3.0",
"private": true,

View File

@ -1,6 +1,6 @@
{
"name": "@cerc-io/tracing-client",
"version": "0.2.101",
"version": "0.2.102",
"description": "ETH VM tracing client",
"main": "dist/index.js",
"scripts": {

View File

@ -1,13 +1,13 @@
{
"name": "@cerc-io/util",
"version": "0.2.101",
"version": "0.2.102",
"main": "dist/index.js",
"license": "AGPL-3.0",
"dependencies": {
"@apollo/utils.keyvaluecache": "^1.0.1",
"@cerc-io/nitro-node": "^0.1.15",
"@cerc-io/peer": "^0.2.101",
"@cerc-io/solidity-mapper": "^0.2.101",
"@cerc-io/peer": "^0.2.102",
"@cerc-io/solidity-mapper": "^0.2.102",
"@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1",
"@ethersproject/properties": "^5.7.0",
"@ethersproject/providers": "^5.4.4",
@ -54,7 +54,7 @@
"yargs": "^17.0.1"
},
"devDependencies": {
"@cerc-io/cache": "^0.2.101",
"@cerc-io/cache": "^0.2.102",
"@nomiclabs/hardhat-waffle": "^2.0.1",
"@types/bunyan": "^1.8.8",
"@types/express": "^4.17.14",

View File

@ -12,7 +12,7 @@ import {
UNKNOWN_EVENT_NAME
} from './constants';
import { JobQueue } from './job-queue';
import { BlockProgressInterface, IndexerInterface, EventInterface, EthFullTransaction, EthFullBlock } from './types';
import { BlockProgressInterface, IndexerInterface, EventInterface, EthFullTransaction, EthFullBlock, ContractInterface } from './types';
import { wait } from './misc';
import { OrderDirection } from './database';
import { JobQueueConfig } from './config';
@ -242,14 +242,14 @@ const _processEvents = async (
// throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`);
// }
const watchedContract = indexer.isWatchedContract(event.contract);
const watchedContracts = indexer.isContractAddressWatched(event.contract);
if (watchedContract) {
if (watchedContracts) {
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
// Parse the unknown event and save updated event to the db
const { eventParsed, event: parsedEvent } = _parseUnknownEvent(indexer, event, watchedContract.kind);
const { eventParsed, event: parsedEvent } = _parseUnknownEvent(indexer, event, watchedContracts);
if (eventParsed) {
updatedDbEvents.push(parsedEvent);
@ -353,14 +353,14 @@ const _processEventsInSubgraphOrder = async (
// Parse events of initially unwatched contracts
for (const event of unwatchedContractEvents) {
const watchedContract = indexer.isWatchedContract(event.contract);
const watchedContracts = indexer.isContractAddressWatched(event.contract);
if (watchedContract) {
if (watchedContracts) {
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
// Parse the unknown event and save updated event to the db
const { eventParsed, event: parsedEvent } = _parseUnknownEvent(indexer, event, watchedContract.kind);
const { eventParsed, event: parsedEvent } = _parseUnknownEvent(indexer, event, watchedContracts);
if (eventParsed) {
updatedDbEvents.push(parsedEvent);
@ -397,12 +397,14 @@ const _getEventsBatch = async (indexer: IndexerInterface, blockHash: string, eve
);
};
const _parseUnknownEvent = (indexer: IndexerInterface, event: EventInterface, contractKind: string): { eventParsed: boolean, event: EventInterface } => {
const _parseUnknownEvent = (indexer: IndexerInterface, event: EventInterface, watchedContracts: ContractInterface[]): { eventParsed: boolean, event: EventInterface } => {
const logObj = JSONbigNative.parse(event.extraInfo);
assert(indexer.parseEventNameAndArgs);
const { eventParsed, eventDetails: { eventName, eventInfo, eventSignature } } = indexer.parseEventNameAndArgs(contractKind, logObj);
const { eventParsed, eventDetails: { eventName, eventInfo, eventSignature } } = indexer.parseEventNameAndArgs(watchedContracts, logObj);
if (!eventParsed) {
// Skip unparsable events
log(`WARNING: Skipping event for contract ${event.contract} as no matching event found in the ABI`);
return { eventParsed: false, event };
}

View File

@ -665,7 +665,7 @@ export class Database {
async saveContract (repo: Repository<ContractInterface>, address: string, kind: string, checkpoint: boolean, startingBlock: number, context?: any): Promise<ContractInterface> {
const contract = await repo
.createQueryBuilder()
.where('address = :address', { address })
.where('address = :address AND kind = :kind', { address, kind })
.getOne();
const entity = repo.create({ address, kind, checkpoint, startingBlock, context });

View File

@ -117,7 +117,7 @@ export class Indexer {
_ethProvider: ethers.providers.JsonRpcProvider;
_jobQueue: JobQueue;
_watchedContracts: { [key: string]: ContractInterface } = {};
_watchedContractsByAddressMap: { [key: string]: ContractInterface[] } = {};
_stateStatusMap: { [key: string]: StateStatus } = {};
_currentEndpointIndex = {
@ -203,8 +203,12 @@ export class Indexer {
const contracts = await this._db.getContracts();
this._watchedContracts = contracts.reduce((acc: { [key: string]: ContractInterface }, contract) => {
acc[contract.address] = contract;
this._watchedContractsByAddressMap = contracts.reduce((acc: { [key: string]: ContractInterface[] }, contract) => {
if (!acc[contract.address]) {
acc[contract.address] = [];
}
acc[contract.address].push(contract);
return acc;
}, {});
@ -441,7 +445,7 @@ export class Indexer {
toBlock: number,
eventSignaturesMap: Map<string, string[]>,
parseEventNameAndArgs: (
kind: string,
watchedContracts: ContractInterface[],
logObj: { topics: string[]; data: string }
) => { eventParsed: boolean, eventDetails: any }
): Promise<{
@ -553,7 +557,7 @@ export class Indexer {
async fetchEvents (
blockHash: string, blockNumber: number,
eventSignaturesMap: Map<string, string[]>,
parseEventNameAndArgs: (kind: string, logObj: any) => { eventParsed: boolean, eventDetails: any }
parseEventNameAndArgs: (watchedContracts: ContractInterface[], logObj: any) => { eventParsed: boolean, eventDetails: any }
): Promise<{ events: DeepPartial<EventInterface>[], transactions: EthFullTransaction[]}> {
const { addresses, topics } = this._createLogsFilters(eventSignaturesMap);
const { logs, transactions } = await this._fetchLogsAndTransactions(blockHash, blockNumber, addresses, topics);
@ -572,7 +576,7 @@ export class Indexer {
blockHash: string, blockNumber: number,
addresses: string[],
eventSignaturesMap: Map<string, string[]>,
parseEventNameAndArgs: (kind: string, logObj: any) => { eventParsed: boolean, eventDetails: any }
parseEventNameAndArgs: (watchedContracts: ContractInterface[], logObj: any) => { eventParsed: boolean, eventDetails: any }
): Promise<DeepPartial<EventInterface>[]> {
const { topics } = this._createLogsFilters(eventSignaturesMap);
const { logs, transactions } = await this._fetchLogsAndTransactions(blockHash, blockNumber, addresses, topics);
@ -618,7 +622,7 @@ export class Indexer {
createDbEventsFromLogsAndTxs (
blockHash: string,
logs: any, transactions: any,
parseEventNameAndArgs: (kind: string, logObj: any) => { eventParsed: boolean, eventDetails: any }
parseEventNameAndArgs: (watchedContracts: ContractInterface[], logObj: any) => { eventParsed: boolean, eventDetails: any }
): DeepPartial<EventInterface>[] {
const transactionMap: {[key: string]: any} = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => {
acc[transaction.txHash] = transaction;
@ -665,12 +669,13 @@ export class Indexer {
const extraInfo: { [key: string]: any } = { topics, data, tx, logIndex };
const contract = ethers.utils.getAddress(address);
const watchedContract = this.isWatchedContract(contract);
const watchedContracts = this.isContractAddressWatched(contract);
if (watchedContract) {
const { eventParsed, eventDetails } = parseEventNameAndArgs(watchedContract.kind, logObj);
if (watchedContracts) {
const { eventParsed, eventDetails } = parseEventNameAndArgs(watchedContracts, logObj);
if (!eventParsed) {
// Skip unparsable events
log(`WARNING: Skipping event for contract ${contract} as no matching event found in ABI`);
continue;
}
@ -856,19 +861,22 @@ export class Indexer {
return this._db.getEventsInRange(fromBlockNumber, toBlockNumber);
}
isWatchedContract (address : string): ContractInterface | undefined {
return this._watchedContracts[address];
isContractAddressWatched (address : string): ContractInterface[] | undefined {
return this._watchedContractsByAddressMap[address];
}
getContractsByKind (kind: string): ContractInterface[] {
const watchedContracts = Object.values(this._watchedContracts)
.filter(contract => contract.kind === kind);
const watchedContracts = Object.values(this._watchedContractsByAddressMap)
.reduce(
(acc, contracts) => acc.concat(contracts.filter(contract => contract.kind === kind)),
[]
);
return watchedContracts;
}
getWatchedContracts (): ContractInterface[] {
return Object.values(this._watchedContracts);
return Object.values(this._watchedContractsByAddressMap).flat();
}
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number, context?: any): Promise<void> {
@ -902,7 +910,19 @@ export class Indexer {
}
cacheContract (contract: ContractInterface): void {
this._watchedContracts[contract.address] = contract;
if (!this._watchedContractsByAddressMap[contract.address]) {
this._watchedContractsByAddressMap[contract.address] = [];
}
// Check if contract with kind is already cached and skip
const isAlreadyCached = this._watchedContractsByAddressMap[contract.address]
.some(watchedContract => contract.id === watchedContract.id);
if (isAlreadyCached) {
return;
}
this._watchedContractsByAddressMap[contract.address].push(contract);
}
async getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise<ValueResult> {
@ -940,7 +960,7 @@ export class Indexer {
}
// Get all the contracts.
const contracts = Object.values(this._watchedContracts);
const [contracts] = Object.values(this._watchedContractsByAddressMap);
// Getting the block for checkpoint.
const block = await this.getBlockProgress(blockHash);
@ -994,10 +1014,11 @@ export class Indexer {
}
// Get the contract.
const contract = this._watchedContracts[contractAddress];
assert(contract, `Contract ${contractAddress} not watched`);
const watchedContracts = this._watchedContractsByAddressMap[contractAddress];
assert(watchedContracts, `Contract ${contractAddress} not watched`);
const [firstWatchedContract] = watchedContracts.sort((a, b) => a.startingBlock - b.startingBlock);
if (block.blockNumber < contract.startingBlock) {
if (block.blockNumber < firstWatchedContract.startingBlock) {
return;
}
@ -1016,10 +1037,13 @@ export class Indexer {
}
// Get all the contracts.
const contracts = Object.values(this._watchedContracts);
const watchedContractsByAddress = Object.values(this._watchedContractsByAddressMap);
// Create an initial state for each contract.
for (const contract of contracts) {
for (const watchedContracts of watchedContractsByAddress) {
// Get the first watched contract
const [contract] = watchedContracts.sort((a, b) => a.startingBlock - b.startingBlock);
// Check if contract has checkpointing on.
if (contract.checkpoint) {
// Check if starting block not reached yet.
@ -1064,8 +1088,9 @@ export class Indexer {
assert(block);
// Get the contract.
const contract = this._watchedContracts[contractAddress];
assert(contract, `Contract ${contractAddress} not watched`);
const watchedContracts = this._watchedContractsByAddressMap[contractAddress];
assert(watchedContracts, `Contract ${contractAddress} not watched`);
const [contract] = watchedContracts.sort((a, b) => a.startingBlock - b.startingBlock);
if (block.blockNumber < contract.startingBlock) {
return;
@ -1100,8 +1125,9 @@ export class Indexer {
}
// Get the contract.
const contract = this._watchedContracts[contractAddress];
assert(contract, `Contract ${contractAddress} not watched`);
const watchedContracts = this._watchedContractsByAddressMap[contractAddress];
assert(watchedContracts, `Contract ${contractAddress} not watched`);
const [contract] = watchedContracts.sort((a, b) => a.startingBlock - b.startingBlock);
if (block.blockNumber < contract.startingBlock) {
return;
@ -1138,8 +1164,9 @@ export class Indexer {
}
// Get the contract.
const contract = this._watchedContracts[contractAddress];
assert(contract, `Contract ${contractAddress} not watched`);
const watchedContracts = this._watchedContractsByAddressMap[contractAddress];
assert(watchedContracts, `Contract ${contractAddress} not watched`);
const [contract] = watchedContracts.sort((a, b) => a.startingBlock - b.startingBlock);
if (currentBlock.blockNumber < contract.startingBlock) {
return;
@ -1341,16 +1368,16 @@ export class Indexer {
return;
}
const contracts = Object.values(this._watchedContracts);
const contractAddresses = Object.keys(this._watchedContractsByAddressMap);
// TODO: Fire a single query for all contracts.
for (const contract of contracts) {
const initState = await this._db.getLatestState(contract.address, StateKind.Init);
const diffState = await this._db.getLatestState(contract.address, StateKind.Diff);
const diffStagedState = await this._db.getLatestState(contract.address, StateKind.DiffStaged);
const checkpointState = await this._db.getLatestState(contract.address, StateKind.Checkpoint);
for (const contractAddress of contractAddresses) {
const initState = await this._db.getLatestState(contractAddress, StateKind.Init);
const diffState = await this._db.getLatestState(contractAddress, StateKind.Diff);
const diffStagedState = await this._db.getLatestState(contractAddress, StateKind.DiffStaged);
const checkpointState = await this._db.getLatestState(contractAddress, StateKind.Checkpoint);
this._stateStatusMap[contract.address] = {
this._stateStatusMap[contractAddress] = {
init: initState?.block.blockNumber,
diff: diffState?.block.blockNumber,
diff_staged: diffStagedState?.block.blockNumber,
@ -1372,7 +1399,7 @@ export class Indexer {
}
await this._db.deleteEntitiesByConditions(dbTx, 'contract', { startingBlock: MoreThan(blockNumber) });
this._clearWatchedContracts((watchedContracts) => watchedContracts.startingBlock > blockNumber);
this._clearWatchedContracts((watchedContract) => watchedContract.startingBlock > blockNumber);
await this._db.deleteEntitiesByConditions(dbTx, 'block_progress', { blockNumber: MoreThan(blockNumber) });
@ -1414,7 +1441,7 @@ export class Indexer {
}
}
async clearProcessedBlockData (block: BlockProgressInterface, entities: EntityTarget<{ blockNumber: number }>[]): Promise<void> {
async clearProcessedBlockData (block: BlockProgressInterface, entities: EntityTarget<{ blockHash: string }>[]): Promise<void> {
const dbTx = await this._db.createTransactionRunner();
try {
@ -1434,11 +1461,15 @@ export class Indexer {
}
}
_clearWatchedContracts (removFilter: (watchedContract: ContractInterface) => boolean): void {
this._watchedContracts = Object.values(this._watchedContracts)
.filter(watchedContract => !removFilter(watchedContract))
.reduce((acc: {[key: string]: ContractInterface}, watchedContract) => {
acc[watchedContract.address] = watchedContract;
_clearWatchedContracts (removeFilter: (watchedContract: ContractInterface) => boolean): void {
this._watchedContractsByAddressMap = Object.entries(this._watchedContractsByAddressMap)
.map(([address, watchedContracts]): [string, ContractInterface[]] => [
address,
watchedContracts.filter(watchedContract => !removeFilter(watchedContract))
])
.filter(([, watchedContracts]) => watchedContracts.length)
.reduce((acc: {[key: string]: ContractInterface[]}, [address, watchedContracts]) => {
acc[address] = watchedContracts;
return acc;
}, {});

View File

@ -202,8 +202,8 @@ export interface IndexerInterface {
saveEventEntity (dbEvent: EventInterface): Promise<EventInterface>
saveEvents (dbEvents: DeepPartial<EventInterface>[]): Promise<void>
processEvent (event: EventInterface, extraData: ExtraEventData): Promise<void>
parseEventNameAndArgs?: (kind: string, logObj: any) => { eventParsed: boolean, eventDetails: any }
isWatchedContract: (address: string) => ContractInterface | undefined;
parseEventNameAndArgs?: (watchedContracts: ContractInterface[], logObj: any) => { eventParsed: boolean, eventDetails: any }
isContractAddressWatched: (address: string) => ContractInterface[] | undefined;
getWatchedContracts: () => ContractInterface[]
getContractsByKind?: (kind: string) => ContractInterface[]
addContracts?: () => Promise<void>