Regenerate conditional-star-release-watcher with latest changes in watcher-ts and run in active mode (#33)

* Regenerate conditional star release watcher

* Update config data

---------

Co-authored-by: neeraj <neeraj.rtly@gmail.com>
This commit is contained in:
prathamesh0 2023-11-23 10:38:22 +05:30 committed by GitHub
parent facde5d370
commit f987e126fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 200 additions and 75 deletions

View File

@ -1,7 +1,8 @@
[server]
host = "127.0.0.1"
port = 3004
kind = "lazy"
kind = "active"
gqlPath = "/graphql"
# Checkpointing state.
checkpointing = true
@ -11,15 +12,15 @@
# Enable state creation
# CAUTION: Disable only if state creation is not desired or can be filled subsequently
enableState = true
# Boolean to filter logs by contract.
filterLogs = false
enableState = false
# Max block range for which to return events in eventsInRange GQL query.
# Use -1 for skipping check on block range.
maxEventsBlockRange = 1000
# Flag to specify whether RPC endpoint supports block hash as block tag parameter
rpcSupportsBlockHashParam = false
# GQL cache settings
[server.gqlCache]
enabled = true
@ -48,8 +49,19 @@
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8083/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8082"
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
# Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client)
rpcClient = true
# Boolean flag to specify if rpcProviderEndpoint is an FEVM RPC endpoint
isFEVM = false
# Boolean flag to filter event logs by contracts
filterLogsByAddresses = true
# Boolean flag to filter event logs by topics
filterLogsByTopics = false
[upstream.cache]
name = "requests"
@ -61,6 +73,17 @@
maxCompletionLagInSecs = 300
jobDelayInMilliSecs = 100
eventsInBatch = 50
subgraphEventsOrder = true
blockDelayInMilliSecs = 2000
prefetchBlocksInMem = true
prefetchBlockCount = 10
# Boolean to switch between modes of processing events when starting the server.
# Setting to true will fetch filtered events and required blocks in a range of blocks and then process them.
# Setting to false will fetch blocks consecutively with its events and then process them (Behaviour is followed in realtime processing near head).
useBlockRanges = true
# Block range in which logs are fetched during historical blocks processing
historicalLogsBlockRange = 2000
# Max block range of historical processing after which it waits for completion of events processing
# If set to -1 historical processing does not wait for events processing and completes till latest canonical block
historicalMaxFetchAhead = 10000

View File

@ -38,28 +38,28 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@cerc-io/cli": "^0.2.74",
"@cerc-io/ipld-eth-client": "^0.2.74",
"@cerc-io/solidity-mapper": "^0.2.74",
"@cerc-io/util": "^0.2.74",
"@ethersproject/providers": "^5.4.4",
"@cerc-io/cli": "^0.2.40",
"@cerc-io/ipld-eth-client": "^0.2.40",
"@cerc-io/solidity-mapper": "^0.2.40",
"@cerc-io/util": "^0.2.40",
"apollo-type-bigint": "^0.1.3",
"debug": "^4.3.1",
"decimal.js": "^10.3.1",
"ethers": "^5.4.4",
"graphql": "^15.5.0",
"json-bigint": "^1.0.0",
"reflect-metadata": "^0.1.13",
"typeorm": "^0.2.32",
"yargs": "^17.0.1",
"decimal.js": "^10.3.1"
"typeorm": "0.2.37",
"yargs": "^17.0.1"
},
"devDependencies": {
"@ethersproject/abi": "^5.3.0",
"@types/yargs": "^17.0.0",
"@types/debug": "^4.1.5",
"@types/json-bigint": "^1.0.0",
"@types/yargs": "^17.0.0",
"@typescript-eslint/eslint-plugin": "^5.47.1",
"@typescript-eslint/parser": "^5.47.1",
"copyfiles": "^2.4.1",
"eslint": "^8.35.0",
"eslint-config-semistandard": "^15.0.1",
"eslint-config-standard": "^16.0.3",
@ -69,7 +69,6 @@
"eslint-plugin-standard": "^5.0.0",
"husky": "^7.0.2",
"ts-node": "^10.2.1",
"typescript": "^5.0.2",
"copyfiles": "^2.4.1"
"typescript": "^5.0.2"
}
}

View File

@ -334,10 +334,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.saveBlockProgress(repo, block);
}
async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<Contract> {
async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number, context?: any): Promise<Contract> {
const repo = queryRunner.manager.getRepository(Contract);
return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock);
return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock, context);
}
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
@ -358,6 +358,18 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber, force);
}
async updateSyncStatusProcessedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
return this._baseDatabase.updateSyncStatusProcessedBlock(repo, blockHash, blockNumber, force);
}
async updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise<SyncStatus | undefined> {
const repo = queryRunner.manager.getRepository(SyncStatus);
return this._baseDatabase.updateSyncStatusIndexingError(repo, hasIndexingError);
}
async getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatus | undefined> {
const repo = queryRunner.manager.getRepository(SyncStatus);

View File

@ -13,8 +13,8 @@ export class BlockProgress implements BlockProgressInterface {
@PrimaryGeneratedColumn()
id!: number;
@Column('varchar')
cid!: string;
@Column('varchar', { nullable: true })
cid!: string | null;
@Column('varchar', { length: 66 })
blockHash!: string;

View File

@ -21,4 +21,7 @@ export class Contract {
@Column('integer')
startingBlock!: number;
@Column('jsonb', { nullable: true })
context!: Record<string, { data: any, type: number }>;
}

View File

@ -12,6 +12,6 @@ export class StateSyncStatus {
@Column('integer')
latestIndexedBlockNumber!: number;
@Column('integer', { nullable: true })
@Column('integer')
latestCheckpointBlockNumber!: number;
}

View File

@ -22,6 +22,12 @@ export class SyncStatus implements SyncStatusInterface {
@Column('integer')
latestIndexedBlockNumber!: number;
@Column('varchar', { length: 66 })
latestProcessedBlockHash!: string;
@Column('integer')
latestProcessedBlockNumber!: number;
@Column('varchar', { length: 66 })
latestCanonicalBlockHash!: string;
@ -33,4 +39,7 @@ export class SyncStatus implements SyncStatusInterface {
@Column('integer')
initialIndexedBlockNumber!: number;
@Column('boolean', { default: false })
hasIndexingError!: boolean;
}

View File

@ -4,5 +4,9 @@ query getSyncStatus{
latestIndexedBlockNumber
latestCanonicalBlockHash
latestCanonicalBlockNumber
initialIndexedBlockHash
initialIndexedBlockNumber
latestProcessedBlockHash
latestProcessedBlockNumber
}
}

View File

@ -13,6 +13,6 @@ export const getForfeited = fs.readFileSync(path.join(__dirname, 'getForfeited.g
export const hasForfeitedBatch = fs.readFileSync(path.join(__dirname, 'hasForfeitedBatch.gql'), 'utf8');
export const getRemainingStars = fs.readFileSync(path.join(__dirname, 'getRemainingStars.gql'), 'utf8');
export const getConditionsState = fs.readFileSync(path.join(__dirname, 'getConditionsState.gql'), 'utf8');
export const getSyncStatus = fs.readFileSync(path.join(__dirname, 'getSyncStatus.gql'), 'utf8');
export const getStateByCID = fs.readFileSync(path.join(__dirname, 'getStateByCID.gql'), 'utf8');
export const getState = fs.readFileSync(path.join(__dirname, 'getState.gql'), 'utf8');
export const getSyncStatus = fs.readFileSync(path.join(__dirname, 'getSyncStatus.gql'), 'utf8');

View File

@ -6,11 +6,10 @@ import assert from 'assert';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import debug from 'debug';
import JSONbig from 'json-bigint';
import { ethers } from 'ethers';
import { ethers, constants } from 'ethers';
import { JsonFragment } from '@ethersproject/abi';
import { BaseProvider } from '@ethersproject/providers';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import {
Indexer as BaseIndexer,
@ -25,7 +24,12 @@ import {
ResultEvent,
getResultEvent,
DatabaseInterface,
Clients
Clients,
EthClient,
UpstreamConfig,
EthFullBlock,
EthFullTransaction,
ExtraEventData
} from '@cerc-io/util';
import ConditionalStarReleaseArtifacts from './artifacts/ConditionalStarRelease.json';
@ -50,36 +54,60 @@ export class Indexer implements IndexerInterface {
_ethProvider: BaseProvider;
_baseIndexer: BaseIndexer;
_serverConfig: ServerConfig;
_upstreamConfig: UpstreamConfig;
_abiMap: Map<string, JsonFragment[]>;
_storageLayoutMap: Map<string, StorageLayout>;
_contractMap: Map<string, ethers.utils.Interface>;
eventSignaturesMap: Map<string, string[]>;
constructor (serverConfig: ServerConfig, db: DatabaseInterface, clients: Clients, ethProvider: BaseProvider, jobQueue: JobQueue) {
constructor (
config: {
server: ServerConfig;
upstream: UpstreamConfig;
},
db: DatabaseInterface,
clients: Clients,
ethProvider: BaseProvider,
jobQueue: JobQueue
) {
assert(db);
assert(clients.ethClient);
this._db = db as Database;
this._ethClient = clients.ethClient;
this._ethProvider = ethProvider;
this._serverConfig = serverConfig;
this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue);
this._serverConfig = config.server;
this._upstreamConfig = config.upstream;
this._baseIndexer = new BaseIndexer(config, this._db, this._ethClient, this._ethProvider, jobQueue);
this._abiMap = new Map();
this._storageLayoutMap = new Map();
this._contractMap = new Map();
this.eventSignaturesMap = new Map();
const { abi: ConditionalStarReleaseABI } = ConditionalStarReleaseArtifacts;
assert(ConditionalStarReleaseABI);
this._abiMap.set(KIND_CONDITIONALSTARRELEASE, ConditionalStarReleaseABI);
this._contractMap.set(KIND_CONDITIONALSTARRELEASE, new ethers.utils.Interface(ConditionalStarReleaseABI));
const ConditionalStarReleaseContractInterface = new ethers.utils.Interface(ConditionalStarReleaseABI);
this._contractMap.set(KIND_CONDITIONALSTARRELEASE, ConditionalStarReleaseContractInterface);
const ConditionalStarReleaseEventSignatures = Object.values(ConditionalStarReleaseContractInterface.events).map(value => {
return ConditionalStarReleaseContractInterface.getEventTopic(value);
});
this.eventSignaturesMap.set(KIND_CONDITIONALSTARRELEASE, ConditionalStarReleaseEventSignatures);
}
get serverConfig (): ServerConfig {
return this._serverConfig;
}
get upstreamConfig (): UpstreamConfig {
return this._upstreamConfig;
}
get storageLayoutMap (): Map<string, StorageLayout> {
return this._storageLayoutMap;
}
@ -515,16 +543,17 @@ export class Indexer implements IndexerInterface {
await this._baseIndexer.removeStates(blockNumber, kind);
}
async triggerIndexingOnEvent (event: Event): Promise<void> {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async triggerIndexingOnEvent (event: Event, extraData: ExtraEventData): Promise<void> {
const resultEvent = this.getResultEvent(event);
// Call custom hook function for indexing on event.
await handleEvent(this, resultEvent);
}
async processEvent (event: Event): Promise<void> {
async processEvent (event: Event, extraData: ExtraEventData): Promise<void> {
// Trigger indexing of data based on the event.
await this.triggerIndexingOnEvent(event);
await this.triggerIndexingOnEvent(event, extraData);
}
async processBlock (blockProgress: BlockProgress): Promise<void> {
@ -555,7 +584,11 @@ export class Indexer implements IndexerInterface {
return this._db.getStateSyncStatus();
}
async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatus | undefined> {
if (!this._serverConfig.enableState) {
return;
}
const dbTx = await this._db.createTransactionRunner();
let res;
@ -589,10 +622,14 @@ export class Indexer implements IndexerInterface {
return res;
}
async getLatestCanonicalBlock (): Promise<BlockProgress> {
async getLatestCanonicalBlock (): Promise<BlockProgress | undefined> {
const syncStatus = await this.getSyncStatus();
assert(syncStatus);
if (syncStatus.latestCanonicalBlockHash === constants.HashZero) {
return;
}
const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
assert(latestCanonicalBlock);
@ -603,8 +640,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getLatestStateIndexedBlock();
}
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock);
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number, context?: any): Promise<void> {
return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock, context);
}
updateStateStatusMap (address: string, stateStatus: StateStatus): void {
@ -619,6 +656,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.saveEventEntity(dbEvent);
}
async saveEvents (dbEvents: Event[]): Promise<void> {
return this._baseIndexer.saveEvents(dbEvents);
}
async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
@ -627,6 +668,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.isWatchedContract(address);
}
getWatchedContracts (): Contract[] {
return this._baseIndexer.getWatchedContracts();
}
getContractsByKind (kind: string): Contract[] {
return this._baseIndexer.getContractsByKind(kind);
}
@ -661,6 +706,14 @@ export class Indexer implements IndexerInterface {
return syncStatus;
}
async updateSyncStatusProcessedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusProcessedBlock(blockHash, blockNumber, force);
}
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatus | undefined> {
return this._baseIndexer.updateSyncStatusIndexingError(hasIndexingError);
}
async getEvent (id: string): Promise<Event | undefined> {
return this._baseIndexer.getEvent(id);
}
@ -677,7 +730,24 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
async fetchAndSaveFilteredEventsAndBlocks (startBlock: number, endBlock: number): Promise<{
blockProgress: BlockProgress,
events: DeepPartial<Event>[],
ethFullBlock: EthFullBlock;
ethFullTransactions: EthFullTransaction[];
}[]> {
return this._baseIndexer.fetchAndSaveFilteredEventsAndBlocks(startBlock, endBlock, this.eventSignaturesMap, this.parseEventNameAndArgs.bind(this));
}
async fetchEventsForContracts (blockHash: string, blockNumber: number, addresses: string[]): Promise<DeepPartial<Event>[]> {
return this._baseIndexer.fetchEventsForContracts(blockHash, blockNumber, addresses, this.eventSignaturesMap, this.parseEventNameAndArgs.bind(this));
}
async saveBlockAndFetchEvents (block: DeepPartial<BlockProgress>): Promise<[
BlockProgress,
DeepPartial<Event>[],
EthFullTransaction[]
]> {
return this._saveBlockAndFetchEvents(block);
}
@ -706,17 +776,26 @@ export class Indexer implements IndexerInterface {
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
}
async clearProcessedBlockData (block: BlockProgress): Promise<void> {
const entities = [...ENTITIES];
await this._baseIndexer.clearProcessedBlockData(block, entities);
}
async _saveBlockAndFetchEvents ({
cid: blockCid,
blockHash,
blockNumber,
blockTimestamp,
parentHash
}: DeepPartial<BlockProgress>): Promise<[BlockProgress, DeepPartial<Event>[]]> {
}: DeepPartial<BlockProgress>): Promise<[
BlockProgress,
DeepPartial<Event>[],
EthFullTransaction[]
]> {
assert(blockHash);
assert(blockNumber);
const dbEvents = await this._baseIndexer.fetchEvents(blockHash, blockNumber, this.parseEventNameAndArgs.bind(this));
const { events: dbEvents, transactions } = await this._baseIndexer.fetchEvents(blockHash, blockNumber, this.eventSignaturesMap, this.parseEventNameAndArgs.bind(this));
const dbTx = await this._db.createTransactionRunner();
try {
@ -733,7 +812,7 @@ export class Indexer implements IndexerInterface {
await dbTx.commitTransaction();
console.timeEnd(`time:indexer#_saveBlockAndFetchEvents-db-save-${blockNumber}`);
return [blockProgress, []];
return [blockProgress, [], transactions];
} catch (error) {
await dbTx.rollbackTransaction();
throw error;

View File

@ -20,6 +20,7 @@ export const main = async (): Promise<any> => {
await jobRunnerCmd.exec(async (jobRunner: JobRunner): Promise<void> => {
await jobRunner.subscribeBlockProcessingQueue();
await jobRunner.subscribeHistoricalProcessingQueue();
await jobRunner.subscribeEventProcessingQueue();
await jobRunner.subscribeBlockCheckpointQueue();
await jobRunner.subscribeHooksQueue();

View File

@ -3,13 +3,8 @@
//
import assert from 'assert';
import BigInt from 'apollo-type-bigint';
import debug from 'debug';
import Decimal from 'decimal.js';
import {
GraphQLScalarType,
GraphQLResolveInfo
} from 'graphql';
import { GraphQLResolveInfo } from 'graphql';
import {
ValueResult,
@ -17,6 +12,8 @@ import {
gqlQueryCount,
getResultState,
IndexerInterface,
GraphQLBigInt,
GraphQLBigDecimal,
EventWatcher,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
setGQLCacheHints
@ -33,20 +30,9 @@ export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher
const gqlCacheConfig = indexer.serverConfig.gqlCache;
return {
BigInt: new BigInt('bigInt'),
BigInt: GraphQLBigInt,
BigDecimal: new GraphQLScalarType({
name: 'BigDecimal',
description: 'BigDecimal custom scalar type',
parseValue (value) {
// value from the client
return new Decimal(value);
},
serialize (value: Decimal) {
// value sent to the client
return value.toFixed();
}
}),
BigDecimal: GraphQLBigDecimal,
Event: {
__resolveType: (obj: any) => {
@ -271,9 +257,14 @@ export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher
gqlTotalQueryCount.inc(1);
gqlQueryCount.labels('eventsInRange').inc(1);
const { expected, actual } = await indexer.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber);
if (expected !== actual) {
throw new Error(`Range not available, expected ${expected}, got ${actual} blocks in range`);
const syncStatus = await indexer.getSyncStatus();
if (!syncStatus) {
throw new Error('No blocks processed yet');
}
if ((fromBlockNumber < syncStatus.initialIndexedBlockNumber) || (toBlockNumber > syncStatus.latestProcessedBlockNumber)) {
throw new Error(`Block range should be between ${syncStatus.initialIndexedBlockNumber} and ${syncStatus.latestProcessedBlockNumber}`);
}
const events = await indexer.getEventsInRange(fromBlockNumber, toBlockNumber);

View File

@ -16,7 +16,7 @@ type Proof {
}
type _Block_ {
cid: String!
cid: String
hash: String!
number: Int!
timestamp: Int!
@ -93,13 +93,6 @@ type GetConditionsStateType {
value3: [BigInt!]!
}
type SyncStatus {
latestIndexedBlockHash: String!
latestIndexedBlockNumber: Int!
latestCanonicalBlockHash: String!
latestCanonicalBlockNumber: Int!
}
type ResultState {
block: _Block_!
contractAddress: String!
@ -108,6 +101,17 @@ type ResultState {
data: String!
}
type SyncStatus {
latestIndexedBlockHash: String!
latestIndexedBlockNumber: Int!
latestCanonicalBlockHash: String!
latestCanonicalBlockNumber: Int!
initialIndexedBlockHash: String!
initialIndexedBlockNumber: Int!
latestProcessedBlockHash: String!
latestProcessedBlockNumber: Int!
}
type Query {
events(blockHash: String!, contractAddress: String!, name: String): [ResultEvent!]
eventsInRange(fromBlockNumber: Int!, toBlockNumber: Int!): [ResultEvent!]
@ -121,9 +125,9 @@ type Query {
hasForfeitedBatch(blockHash: String!, contractAddress: String!, _participant: String!, _batch: Int!): ResultBoolean!
getRemainingStars(blockHash: String!, contractAddress: String!, _participant: String!): ResultIntArray!
getConditionsState(blockHash: String!, contractAddress: String!): ResultGetConditionsStateType!
getSyncStatus: SyncStatus
getStateByCID(cid: String!): ResultState
getState(blockHash: String!, contractAddress: String!, kind: String): ResultState
getSyncStatus: SyncStatus
}
type Mutation {