Refactor import-state CLI to cli package (#250)

* Remove unnecessary upstream config arg to event watcher

* Initialize event watcher in CLI package

* Refactor import-state CLI to cli package
This commit is contained in:
prathamesh0 2022-11-22 05:11:15 -06:00 committed by GitHub
parent 6622d0874e
commit 6737ec756c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 369 additions and 513 deletions

View File

@ -5,6 +5,7 @@
import 'reflect-metadata'; import 'reflect-metadata';
import assert from 'assert'; import assert from 'assert';
import { ConnectionOptions } from 'typeorm'; import { ConnectionOptions } from 'typeorm';
import { PubSub } from 'graphql-subscriptions';
import { JsonRpcProvider } from '@ethersproject/providers'; import { JsonRpcProvider } from '@ethersproject/providers';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node'; import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
@ -17,15 +18,44 @@ import {
IndexerInterface, IndexerInterface,
ServerConfig, ServerConfig,
Database as BaseDatabase, Database as BaseDatabase,
Clients Clients,
EventWatcherInterface
} from '@cerc-io/util'; } from '@cerc-io/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
export class BaseCmd { export class BaseCmd {
_config?: Config; _config?: Config;
_clients?: Clients; _clients?: Clients;
_ethProvider?: JsonRpcProvider; _ethProvider?: JsonRpcProvider;
_jobQueue?: JobQueue
_database?: DatabaseInterface; _database?: DatabaseInterface;
_indexer?: IndexerInterface; _indexer?: IndexerInterface;
_graphDb?: GraphDatabase
_eventWatcher?: EventWatcherInterface
get config (): Config | undefined {
return this._config;
}
get jobQueue (): JobQueue | undefined {
return this._jobQueue;
}
get database (): DatabaseInterface | undefined {
return this._database;
}
get graphDb (): GraphDatabase | undefined {
return this._graphDb;
}
get indexer (): IndexerInterface | undefined {
return this._indexer;
}
get eventWatcher (): EventWatcherInterface | undefined {
return this._eventWatcher;
}
async initConfig<ConfigType> (configFile: string): Promise<ConfigType> { async initConfig<ConfigType> (configFile: string): Promise<ConfigType> {
if (!this._config) { if (!this._config) {
@ -49,10 +79,7 @@ export class BaseCmd {
graphWatcher?: GraphWatcher graphWatcher?: GraphWatcher
) => IndexerInterface, ) => IndexerInterface,
clients: { [key: string]: any } = {} clients: { [key: string]: any } = {}
): Promise<{ ): Promise<void> {
database: DatabaseInterface,
indexer: IndexerInterface
}> {
assert(this._config); assert(this._config);
this._database = new Database(this._config.database, this._config.server); this._database = new Database(this._config.database, this._config.server);
@ -64,8 +91,8 @@ export class BaseCmd {
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string'); assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); this._jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start(); await this._jobQueue.start();
const { ethClient, ethProvider } = await initClients(this._config); const { ethClient, ethProvider } = await initClients(this._config);
this._ethProvider = ethProvider; this._ethProvider = ethProvider;
@ -74,17 +101,33 @@ export class BaseCmd {
// Check if subgraph watcher. // Check if subgraph watcher.
if (this._config.server.subgraphPath) { if (this._config.server.subgraphPath) {
const graphWatcher = await this._getGraphWatcher(this._database.baseDatabase); const graphWatcher = await this._getGraphWatcher(this._database.baseDatabase);
this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, jobQueue, graphWatcher); this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, this._jobQueue, graphWatcher);
await this._indexer.init(); await this._indexer.init();
graphWatcher.setIndexer(this._indexer); graphWatcher.setIndexer(this._indexer);
await graphWatcher.init(); await graphWatcher.init();
} else { } else {
this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, jobQueue); this._indexer = new Indexer(this._config.server, this._database, this._clients, ethProvider, this._jobQueue);
await this._indexer.init(); await this._indexer.init();
} }
}
return { database: this._database, indexer: this._indexer }; async initEventWatcher (
EventWatcher: new(
ethClient: EthClient,
indexer: IndexerInterface,
pubsub: PubSub,
jobQueue: JobQueue
) => EventWatcherInterface
): Promise<void> {
assert(this._clients?.ethClient);
assert(this._indexer);
assert(this._jobQueue);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
this._eventWatcher = new EventWatcher(this._clients.ethClient, this._indexer, pubsub, this._jobQueue);
} }
async _getGraphWatcher (baseDatabase: BaseDatabase): Promise<GraphWatcher> { async _getGraphWatcher (baseDatabase: BaseDatabase): Promise<GraphWatcher> {
@ -92,9 +135,9 @@ export class BaseCmd {
assert(this._clients?.ethClient); assert(this._clients?.ethClient);
assert(this._ethProvider); assert(this._ethProvider);
const graphDb = new GraphDatabase(this._config.server, baseDatabase); this._graphDb = new GraphDatabase(this._config.server, baseDatabase);
await graphDb.init(); await this._graphDb.init();
return new GraphWatcher(graphDb, this._clients.ethClient, this._ethProvider, this._config.server); return new GraphWatcher(this._graphDb, this._clients.ethClient, this._ethProvider, this._config.server);
} }
} }

View File

@ -60,17 +60,21 @@ export class CreateCheckpointCmd {
this._argv = argv; this._argv = argv;
await this.initConfig(argv.configFile); await this.initConfig(argv.configFile);
({ database: this._database, indexer: this._indexer } = await this._baseCmd.init(Database, Indexer, clients)); await this._baseCmd.init(Database, Indexer, clients);
} }
async exec (): Promise<void> { async exec (): Promise<void> {
assert(this._argv); assert(this._argv);
assert(this._database);
assert(this._indexer);
const blockHash = await this._indexer.processCLICheckpoint(this._argv.address, this._argv.blockHash); const database = this._baseCmd.database;
const indexer = this._baseCmd.indexer;
await this._database.close(); assert(database);
assert(indexer);
const blockHash = await indexer.processCLICheckpoint(this._argv.address, this._argv.blockHash);
await database.close();
log(`Created a checkpoint for contract ${this._argv.address} at block-hash ${blockHash}`); log(`Created a checkpoint for contract ${this._argv.address} at block-hash ${blockHash}`);
} }
} }

View File

@ -0,0 +1,181 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import yargs from 'yargs';
import 'reflect-metadata';
import assert from 'assert';
import path from 'path';
import fs from 'fs';
import debug from 'debug';
import { ConnectionOptions } from 'typeorm';
import { PubSub } from 'graphql-subscriptions';
import { JsonRpcProvider } from '@ethersproject/providers';
import { GraphWatcher, updateEntitiesFromState } from '@cerc-io/graph-node';
import { EthClient } from '@cerc-io/ipld-eth-client';
import {
DEFAULT_CONFIG_PATH,
JobQueue,
DatabaseInterface,
IndexerInterface,
ServerConfig,
Clients,
EventWatcherInterface,
fillBlocks,
StateKind
} from '@cerc-io/util';
import * as codec from '@ipld/dag-cbor';
import { BaseCmd } from './base';
const log = debug('vulcanize:import-state');
interface Arguments {
configFile: string;
importFile: string;
}
export class ImportStateCmd {
_argv?: Arguments
_baseCmd: BaseCmd;
constructor () {
this._baseCmd = new BaseCmd();
}
async initConfig<ConfigType> (): Promise<ConfigType> {
this._argv = this._getArgv();
assert(this._argv);
return this._baseCmd.initConfig(this._argv.configFile);
}
async init (
Database: new (config: ConnectionOptions,
serverConfig?: ServerConfig
) => DatabaseInterface,
Indexer: new (
serverConfig: ServerConfig,
db: DatabaseInterface,
clients: Clients,
ethProvider: JsonRpcProvider,
jobQueue: JobQueue,
graphWatcher?: GraphWatcher
) => IndexerInterface,
EventWatcher: new(
ethClient: EthClient,
indexer: IndexerInterface,
pubsub: PubSub,
jobQueue: JobQueue
) => EventWatcherInterface,
clients: { [key: string]: any } = {}
): Promise<void> {
await this.initConfig();
await this._baseCmd.init(Database, Indexer, clients);
await this._baseCmd.initEventWatcher(EventWatcher);
}
async exec (State: new() => any): Promise<void> {
assert(this._argv);
const config = this._baseCmd.config;
const jobQueue = this._baseCmd.jobQueue;
const database = this._baseCmd.database;
const indexer = this._baseCmd.indexer;
const eventWatcher = this._baseCmd.eventWatcher;
assert(config);
assert(jobQueue);
assert(database);
assert(indexer);
assert(eventWatcher);
// Import data.
const importFilePath = path.resolve(this._argv.importFile);
const encodedImportData = fs.readFileSync(importFilePath);
const importData = codec.decode(Buffer.from(encodedImportData)) as any;
// Fill the snapshot block.
await fillBlocks(
jobQueue,
indexer,
eventWatcher,
config.jobQueue.blockDelayInMilliSecs,
{
prefetch: true,
startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber
}
);
// Fill the Contracts.
for (const contract of importData.contracts) {
indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock);
}
// Get the snapshot block.
const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash);
assert(block);
// Fill the States.
for (const checkpoint of importData.stateCheckpoints) {
let state = new State();
state = Object.assign(state, checkpoint);
state.block = block;
state.data = Buffer.from(codec.encode(state.data));
state = await indexer.saveOrUpdateState(state);
// Fill entities using State if:
// relationsMap defined for the watcher,
// graphDb instance is avaiable
// TODO: Fill latest entity tables
if (indexer.getRelationsMap) {
if (this._baseCmd.graphDb) {
await updateEntitiesFromState(this._baseCmd.graphDb, indexer, state);
} else if (database.graphDatabase) {
await updateEntitiesFromState(database.graphDatabase, indexer, state);
}
}
}
// Mark snapshot block as completely processed.
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber);
await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber);
await indexer.updateStateSyncStatusIndexedBlock(block.blockNumber);
await indexer.updateStateSyncStatusCheckpointBlock(block.blockNumber);
// The 'diff_staged' and 'init' State entries are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeStates(block.blockNumber, StateKind.Init);
await indexer.removeStates(block.blockNumber, StateKind.DiffStaged);
log(`Import completed for snapshot block at height ${block.blockNumber}`);
await database.close();
}
_getArgv (): any {
return yargs.parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
alias: 'f',
type: 'string',
require: true,
demandOption: true,
describe: 'Configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
importFile: {
alias: 'i',
type: 'string',
demandOption: true,
describe: 'Import file path (JSON)'
}
}).argv;
}
}

View File

@ -7,3 +7,4 @@ export * from './reset/watcher';
export * from './reset/state'; export * from './reset/state';
export * from './checkpoint/create'; export * from './checkpoint/create';
export * from './inspect-cid'; export * from './inspect-cid';
export * from './import-state';

View File

@ -32,8 +32,6 @@ interface Arguments {
export class InspectCIDCmd { export class InspectCIDCmd {
_argv?: Arguments _argv?: Arguments
_baseCmd: BaseCmd; _baseCmd: BaseCmd;
_database?: DatabaseInterface;
_indexer?: IndexerInterface;
constructor () { constructor () {
this._baseCmd = new BaseCmd(); this._baseCmd = new BaseCmd();
@ -63,21 +61,25 @@ export class InspectCIDCmd {
): Promise<void> { ): Promise<void> {
await this.initConfig(); await this.initConfig();
({ database: this._database, indexer: this._indexer } = await this._baseCmd.init(Database, Indexer, clients)); await this._baseCmd.init(Database, Indexer, clients);
} }
async exec (): Promise<void> { async exec (): Promise<void> {
assert(this._argv); assert(this._argv);
assert(this._database);
assert(this._indexer);
const state = await this._indexer.getStateByCID(this._argv.cid); const database = this._baseCmd.database;
const indexer = this._baseCmd.indexer;
assert(database);
assert(indexer);
const state = await indexer.getStateByCID(this._argv.cid);
assert(state, 'State for the provided CID doesn\'t exist.'); assert(state, 'State for the provided CID doesn\'t exist.');
const stateData = await this._indexer.getStateData(state); const stateData = await indexer.getStateData(state);
log(util.inspect(stateData, false, null)); log(util.inspect(stateData, false, null));
await this._database.close(); await database.close();
} }
_getArgv (): any { _getArgv (): any {

View File

@ -29,8 +29,6 @@ interface Arguments {
export class ResetWatcherCmd { export class ResetWatcherCmd {
_argv?: Arguments _argv?: Arguments
_baseCmd: BaseCmd; _baseCmd: BaseCmd;
_database?: DatabaseInterface;
_indexer?: IndexerInterface;
constructor () { constructor () {
this._baseCmd = new BaseCmd(); this._baseCmd = new BaseCmd();
@ -59,17 +57,21 @@ export class ResetWatcherCmd {
this._argv = argv; this._argv = argv;
await this.initConfig(argv.configFile); await this.initConfig(argv.configFile);
({ database: this._database, indexer: this._indexer } = await this._baseCmd.init(Database, Indexer, clients)); await this._baseCmd.init(Database, Indexer, clients);
} }
async exec (): Promise<void> { async exec (): Promise<void> {
assert(this._argv); assert(this._argv);
assert(this._database);
assert(this._indexer);
await this._indexer.resetWatcherToBlock(this._argv.blockNumber); const database = this._baseCmd.database;
const indexer = this._baseCmd.indexer;
await this._database.close(); assert(database);
assert(indexer);
await indexer.resetWatcherToBlock(this._argv.blockNumber);
await database.close();
log('Reset watcher successfully'); log('Reset watcher successfully');
} }
} }

View File

@ -31,8 +31,6 @@ interface Arguments {
export class WatchContractCmd { export class WatchContractCmd {
_argv?: Arguments; _argv?: Arguments;
_baseCmd: BaseCmd; _baseCmd: BaseCmd;
_database?: DatabaseInterface;
_indexer?: IndexerInterface;
constructor () { constructor () {
this._baseCmd = new BaseCmd(); this._baseCmd = new BaseCmd();
@ -62,17 +60,20 @@ export class WatchContractCmd {
): Promise<void> { ): Promise<void> {
await this.initConfig(); await this.initConfig();
({ database: this._database, indexer: this._indexer } = await this._baseCmd.init(Database, Indexer, clients)); await this._baseCmd.init(Database, Indexer, clients);
} }
async exec (): Promise<void> { async exec (): Promise<void> {
assert(this._argv); assert(this._argv);
assert(this._database);
assert(this._indexer);
assert(this._indexer.watchContract);
await this._indexer.watchContract(this._argv.address, this._argv.kind, this._argv.checkpoint, this._argv.startingBlock); const database = this._baseCmd.database;
await this._database.close(); const indexer = this._baseCmd.indexer;
assert(database);
assert(indexer);
await indexer.watchContract(this._argv.address, this._argv.kind, this._argv.checkpoint, this._argv.startingBlock);
await database.close();
} }
_getArgv (): any { _getArgv (): any {

View File

@ -11,8 +11,7 @@ import {
EventWatcher as BaseEventWatcher, EventWatcher as BaseEventWatcher,
EventWatcherInterface, EventWatcherInterface,
QUEUE_BLOCK_PROCESSING, QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING, QUEUE_EVENT_PROCESSING
UpstreamConfig
} from '@cerc-io/util'; } from '@cerc-io/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
@ -25,7 +24,7 @@ export class EventWatcher implements EventWatcherInterface {
_pubsub: PubSub _pubsub: PubSub
_jobQueue: JobQueue _jobQueue: JobQueue
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient); assert(ethClient);
assert(indexer); assert(indexer);
@ -33,7 +32,7 @@ export class EventWatcher implements EventWatcherInterface {
this._indexer = indexer; this._indexer = indexer;
this._pubsub = pubsub; this._pubsub = pubsub;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
} }
getEventIterator (): AsyncIterator<any> { getEventIterator (): AsyncIterator<any> {

View File

@ -106,7 +106,7 @@ export const main = async (): Promise<any> => {
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub(); const pubsub = new PubSub();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
}; };

View File

@ -77,7 +77,7 @@ export const main = async (): Promise<any> => {
await graphWatcher.init(); await graphWatcher.init();
{{/if}} {{/if}}
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
// Import data. // Import data.
const importFilePath = path.resolve(argv.importFile); const importFilePath = path.resolve(argv.importFile);

View File

@ -71,7 +71,7 @@ export const main = async (): Promise<any> => {
await graphWatcher.init(); await graphWatcher.init();
{{/if}} {{/if}}
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) { if (watcherKind === KIND_ACTIVE) {
await jobQueue.start(); await jobQueue.start();

View File

@ -2,20 +2,12 @@
// Copyright 2021 Vulcanize, Inc. // Copyright 2021 Vulcanize, Inc.
// //
import assert from 'assert';
import 'reflect-metadata'; import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug'; import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import fs from 'fs';
import path from 'path';
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@cerc-io/util'; import { ImportStateCmd } from '@cerc-io/cli';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import * as codec from '@ipld/dag-cbor';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { Database } from '../database';
import { Indexer } from '../indexer'; import { Indexer } from '../indexer';
import { EventWatcher } from '../events'; import { EventWatcher } from '../events';
import { State } from '../entity/State'; import { State } from '../entity/State';
@ -23,109 +15,10 @@ import { State } from '../entity/State';
const log = debug('vulcanize:import-state'); const log = debug('vulcanize:import-state');
export const main = async (): Promise<any> => { export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({ const importStateCmd = new ImportStateCmd();
'parse-numbers': false await importStateCmd.init(Database, Indexer, EventWatcher);
}).options({
configFile: {
alias: 'f',
type: 'string',
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
importFile: {
alias: 'i',
type: 'string',
demandOption: true,
describe: 'Import file path (JSON)'
}
}).argv;
const config: Config = await getConfig(argv.configFile); await importStateCmd.exec(State);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
// Import data.
const importFilePath = path.resolve(argv.importFile);
const encodedImportData = fs.readFileSync(importFilePath);
const importData = codec.decode(Buffer.from(encodedImportData)) as any;
// Fill the snapshot block.
await fillBlocks(
jobQueue,
indexer,
eventWatcher,
jobQueueConfig.blockDelayInMilliSecs,
{
prefetch: true,
startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber
}
);
// Fill the Contracts.
for (const contract of importData.contracts) {
await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock);
}
// Get the snapshot block.
const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash);
assert(block);
// Fill the States.
for (const checkpoint of importData.stateCheckpoints) {
let state = new State();
state = Object.assign(state, checkpoint);
state.block = block;
state.data = Buffer.from(codec.encode(state.data));
state = await indexer.saveOrUpdateState(state);
await graphWatcher.updateEntitiesFromState(state);
}
// Mark snapshot block as completely processed.
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber);
await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber);
await indexer.updateStateSyncStatusIndexedBlock(block.blockNumber);
await indexer.updateStateSyncStatusCheckpointBlock(block.blockNumber);
// The 'diff_staged' and 'init' State entries are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeStates(block.blockNumber, StateKind.Init);
await indexer.removeStates(block.blockNumber, StateKind.DiffStaged);
log(`Import completed for snapshot block at height ${block.blockNumber}`);
}; };
main().catch(err => { main().catch(err => {

View File

@ -12,7 +12,7 @@ import {
EventWatcherInterface, EventWatcherInterface,
QUEUE_BLOCK_PROCESSING, QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING, QUEUE_EVENT_PROCESSING,
UpstreamConfig IndexerInterface
} from '@cerc-io/util'; } from '@cerc-io/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
@ -25,15 +25,15 @@ export class EventWatcher implements EventWatcherInterface {
_pubsub: PubSub _pubsub: PubSub
_jobQueue: JobQueue _jobQueue: JobQueue
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient); assert(ethClient);
assert(indexer); assert(indexer);
this._ethClient = ethClient; this._ethClient = ethClient;
this._indexer = indexer; this._indexer = indexer as Indexer;
this._pubsub = pubsub; this._pubsub = pubsub;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
} }
getEventIterator (): AsyncIterator<any> { getEventIterator (): AsyncIterator<any> {

View File

@ -96,7 +96,7 @@ export const main = async (): Promise<any> => {
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub(); const pubsub = new PubSub();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
}; };

View File

@ -758,7 +758,6 @@ export class Indexer implements IndexerInterface {
} }
_populateRelationsMap (): void { _populateRelationsMap (): void {
// Needs to be generated by codegen.
this._relationsMap.set(ProducerSet, { this._relationsMap.set(ProducerSet, {
producers: { producers: {
entity: Producer, entity: Producer,

View File

@ -65,7 +65,7 @@ export const main = async (): Promise<any> => {
graphWatcher.setIndexer(indexer); graphWatcher.setIndexer(indexer);
await graphWatcher.init(); await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) { if (watcherKind === KIND_ACTIVE) {
await jobQueue.start(); await jobQueue.start();

View File

@ -11,8 +11,7 @@ import {
EventWatcher as BaseEventWatcher, EventWatcher as BaseEventWatcher,
EventWatcherInterface, EventWatcherInterface,
QUEUE_BLOCK_PROCESSING, QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING, QUEUE_EVENT_PROCESSING
UpstreamConfig
} from '@cerc-io/util'; } from '@cerc-io/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
@ -25,7 +24,7 @@ export class EventWatcher implements EventWatcherInterface {
_pubsub: PubSub _pubsub: PubSub
_jobQueue: JobQueue _jobQueue: JobQueue
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient); assert(ethClient);
assert(indexer); assert(indexer);
@ -33,7 +32,7 @@ export class EventWatcher implements EventWatcherInterface {
this._indexer = indexer; this._indexer = indexer;
this._pubsub = pubsub; this._pubsub = pubsub;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
} }
getEventIterator (): AsyncIterator<any> { getEventIterator (): AsyncIterator<any> {

View File

@ -76,7 +76,7 @@ export const main = async (): Promise<any> => {
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
}; };

View File

@ -12,7 +12,19 @@ import { BaseProvider } from '@ethersproject/providers';
import { EthClient } from '@cerc-io/ipld-eth-client'; import { EthClient } from '@cerc-io/ipld-eth-client';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import { IndexerInterface, Indexer as BaseIndexer, ValueResult, JobQueue, Where, QueryOptions, ServerConfig, StateStatus, DatabaseInterface, Clients } from '@cerc-io/util'; import {
IndexerInterface,
Indexer as BaseIndexer,
ValueResult,
JobQueue,
Where,
QueryOptions,
ServerConfig,
StateStatus,
DatabaseInterface,
Clients,
StateKind
} from '@cerc-io/util';
import { Database, ENTITIES } from './database'; import { Database, ENTITIES } from './database';
import { Event } from './entity/Event'; import { Event } from './entity/Event';
@ -273,6 +285,14 @@ export class Indexer implements IndexerInterface {
return undefined; return undefined;
} }
async saveOrUpdateState (state: State): Promise<State> {
return {} as State;
}
async removeStates (blockNumber: number, kind: StateKind): Promise<void> {
// TODO Implement
}
getStateData (state: State): any { getStateData (state: State): any {
return this._baseIndexer.getStateData(state); return this._baseIndexer.getStateData(state);
} }

View File

@ -57,7 +57,7 @@ export const main = async (): Promise<any> => {
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init(); await indexer.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) { if (watcherKind === KIND_ACTIVE) {
await jobQueue.start(); await jobQueue.start();

View File

@ -2,17 +2,10 @@
// Copyright 2021 Vulcanize, Inc. // Copyright 2021 Vulcanize, Inc.
// //
import assert from 'assert';
import 'reflect-metadata'; import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug'; import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import fs from 'fs';
import path from 'path';
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@cerc-io/util'; import { ImportStateCmd } from '@cerc-io/cli';
import * as codec from '@ipld/dag-cbor';
import { Database } from '../database'; import { Database } from '../database';
import { Indexer } from '../indexer'; import { Indexer } from '../indexer';
@ -22,100 +15,10 @@ import { State } from '../entity/State';
const log = debug('vulcanize:import-state'); const log = debug('vulcanize:import-state');
export const main = async (): Promise<any> => { export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({ const importStateCmd = new ImportStateCmd();
'parse-numbers': false await importStateCmd.init(Database, Indexer, EventWatcher);
}).options({
configFile: {
alias: 'f',
type: 'string',
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
importFile: {
alias: 'i',
type: 'string',
demandOption: true,
describe: 'Import file path (JSON)'
}
}).argv;
const config: Config = await getConfig(argv.configFile); await importStateCmd.exec(State);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
// Import data.
const importFilePath = path.resolve(argv.importFile);
const encodedImportData = fs.readFileSync(importFilePath);
const importData = codec.decode(Buffer.from(encodedImportData)) as any;
// Fill the snapshot block.
await fillBlocks(
jobQueue,
indexer,
eventWatcher,
jobQueueConfig.blockDelayInMilliSecs,
{
prefetch: true,
startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber
}
);
// Fill the Contracts.
for (const contract of importData.contracts) {
await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock);
}
// Get the snapshot block.
const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash);
assert(block);
// Fill the States.
for (const checkpoint of importData.stateCheckpoints) {
let state = new State();
state = Object.assign(state, checkpoint);
state.block = block;
state.data = Buffer.from(codec.encode(state.data));
state = await indexer.saveOrUpdateState(state);
}
// Mark snapshot block as completely processed.
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber);
await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber);
await indexer.updateStateSyncStatusIndexedBlock(block.blockNumber);
await indexer.updateStateSyncStatusCheckpointBlock(block.blockNumber);
// The 'diff_staged' and 'init' State entries are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeStates(block.blockNumber, StateKind.Init);
await indexer.removeStates(block.blockNumber, StateKind.DiffStaged);
log(`Import completed for snapshot block at height ${block.blockNumber}`);
}; };
main().catch(err => { main().catch(err => {

View File

@ -12,7 +12,7 @@ import {
EventWatcherInterface, EventWatcherInterface,
QUEUE_BLOCK_PROCESSING, QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING, QUEUE_EVENT_PROCESSING,
UpstreamConfig IndexerInterface
} from '@cerc-io/util'; } from '@cerc-io/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
@ -25,15 +25,15 @@ export class EventWatcher implements EventWatcherInterface {
_pubsub: PubSub _pubsub: PubSub
_jobQueue: JobQueue _jobQueue: JobQueue
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient); assert(ethClient);
assert(indexer); assert(indexer);
this._ethClient = ethClient; this._ethClient = ethClient;
this._indexer = indexer; this._indexer = indexer as Indexer;
this._pubsub = pubsub; this._pubsub = pubsub;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
} }
getEventIterator (): AsyncIterator<any> { getEventIterator (): AsyncIterator<any> {

View File

@ -74,7 +74,7 @@ export const main = async (): Promise<any> => {
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub(); const pubsub = new PubSub();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
}; };

View File

@ -56,7 +56,7 @@ export const main = async (): Promise<any> => {
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init(); await indexer.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) { if (watcherKind === KIND_ACTIVE) {
await jobQueue.start(); await jobQueue.start();

View File

@ -348,6 +348,7 @@ export class GraphWatcher {
} }
} }
// TODO Remove after updating codegen CLIs
async updateEntitiesFromState (state: StateInterface) { async updateEntitiesFromState (state: StateInterface) {
assert(this._indexer); assert(this._indexer);
await updateEntitiesFromState(this._database, this._indexer, state); await updateEntitiesFromState(this._database, this._indexer, state);

View File

@ -13,7 +13,8 @@ import {
StateSyncStatusInterface, StateSyncStatusInterface,
StateInterface, StateInterface,
getResultEvent, getResultEvent,
ResultEvent ResultEvent,
StateKind
} from '@cerc-io/util'; } from '@cerc-io/util';
import { EthClient } from '@cerc-io/ipld-eth-client'; import { EthClient } from '@cerc-io/ipld-eth-client';
import { GetStorageAt, getStorageValue, MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { GetStorageAt, getStorageValue, MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
@ -188,6 +189,10 @@ export class Indexer implements IndexerInterface {
return undefined; return undefined;
} }
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
return undefined;
}
async processBlock (blockProgress: BlockProgressInterface): Promise<void> { async processBlock (blockProgress: BlockProgressInterface): Promise<void> {
return undefined; return undefined;
} }
@ -208,6 +213,14 @@ export class Indexer implements IndexerInterface {
return undefined; return undefined;
} }
async saveOrUpdateState (state: StateInterface): Promise<StateInterface> {
return {} as StateInterface;
}
async removeStates (blockNumber: number, kind: StateKind): Promise<void> {
// TODO Implement
}
getStateData (state: StateInterface): any { getStateData (state: StateInterface): any {
return undefined; return undefined;
} }

View File

@ -2,20 +2,12 @@
// Copyright 2021 Vulcanize, Inc. // Copyright 2021 Vulcanize, Inc.
// //
import assert from 'assert';
import 'reflect-metadata'; import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug'; import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import fs from 'fs';
import path from 'path';
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@cerc-io/util'; import { ImportStateCmd } from '@cerc-io/cli';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import * as codec from '@ipld/dag-cbor';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP } from '../database'; import { Database } from '../database';
import { Indexer } from '../indexer'; import { Indexer } from '../indexer';
import { EventWatcher } from '../events'; import { EventWatcher } from '../events';
import { State } from '../entity/State'; import { State } from '../entity/State';
@ -23,109 +15,10 @@ import { State } from '../entity/State';
const log = debug('vulcanize:import-state'); const log = debug('vulcanize:import-state');
export const main = async (): Promise<any> => { export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({ const importStateCmd = new ImportStateCmd();
'parse-numbers': false await importStateCmd.init(Database, Indexer, EventWatcher);
}).options({
configFile: {
alias: 'f',
type: 'string',
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
importFile: {
alias: 'i',
type: 'string',
demandOption: true,
describe: 'Import file path (JSON)'
}
}).argv;
const config: Config = await getConfig(argv.configFile); await importStateCmd.exec(State);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
// Import data.
const importFilePath = path.resolve(argv.importFile);
const encodedImportData = fs.readFileSync(importFilePath);
const importData = codec.decode(Buffer.from(encodedImportData)) as any;
// Fill the snapshot block.
await fillBlocks(
jobQueue,
indexer,
eventWatcher,
jobQueueConfig.blockDelayInMilliSecs,
{
prefetch: true,
startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber
}
);
// Fill the Contracts.
for (const contract of importData.contracts) {
await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock);
}
// Get the snapshot block.
const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash);
assert(block);
// Fill the States.
for (const checkpoint of importData.stateCheckpoints) {
let state = new State();
state = Object.assign(state, checkpoint);
state.block = block;
state.data = Buffer.from(codec.encode(state.data));
state = await indexer.saveOrUpdateState(state);
await graphWatcher.updateEntitiesFromState(state);
}
// Mark snapshot block as completely processed.
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber);
await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber);
await indexer.updateStateSyncStatusIndexedBlock(block.blockNumber);
await indexer.updateStateSyncStatusCheckpointBlock(block.blockNumber);
// The 'diff_staged' and 'init' State entries are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeStates(block.blockNumber, StateKind.Init);
await indexer.removeStates(block.blockNumber, StateKind.DiffStaged);
log(`Import completed for snapshot block at height ${block.blockNumber}`);
}; };
main().catch(err => { main().catch(err => {

View File

@ -12,7 +12,7 @@ import {
EventWatcherInterface, EventWatcherInterface,
QUEUE_BLOCK_PROCESSING, QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING, QUEUE_EVENT_PROCESSING,
UpstreamConfig IndexerInterface
} from '@cerc-io/util'; } from '@cerc-io/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
@ -25,15 +25,15 @@ export class EventWatcher implements EventWatcherInterface {
_pubsub: PubSub _pubsub: PubSub
_jobQueue: JobQueue _jobQueue: JobQueue
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient); assert(ethClient);
assert(indexer); assert(indexer);
this._ethClient = ethClient; this._ethClient = ethClient;
this._indexer = indexer; this._indexer = indexer as Indexer;
this._pubsub = pubsub; this._pubsub = pubsub;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
} }
getEventIterator (): AsyncIterator<any> { getEventIterator (): AsyncIterator<any> {

View File

@ -96,7 +96,7 @@ export const main = async (): Promise<any> => {
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub(); const pubsub = new PubSub();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
}; };

View File

@ -616,7 +616,6 @@ export class Indexer implements IndexerInterface {
} }
_populateRelationsMap (): void { _populateRelationsMap (): void {
// Needs to be generated by codegen.
this._relationsMap.set(Author, { this._relationsMap.set(Author, {
blogs: { blogs: {
entity: Blog, entity: Blog,

View File

@ -65,7 +65,7 @@ export const main = async (): Promise<any> => {
graphWatcher.setIndexer(indexer); graphWatcher.setIndexer(indexer);
await graphWatcher.init(); await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) { if (watcherKind === KIND_ACTIVE) {
await jobQueue.start(); await jobQueue.start();

View File

@ -2,17 +2,10 @@
// Copyright 2021 Vulcanize, Inc. // Copyright 2021 Vulcanize, Inc.
// //
import assert from 'assert';
import 'reflect-metadata'; import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug'; import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import fs from 'fs';
import path from 'path';
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@cerc-io/util'; import { ImportStateCmd } from '@cerc-io/cli';
import * as codec from '@ipld/dag-cbor';
import { Database } from '../database'; import { Database } from '../database';
import { Indexer } from '../indexer'; import { Indexer } from '../indexer';
@ -22,100 +15,10 @@ import { State } from '../entity/State';
const log = debug('vulcanize:import-state'); const log = debug('vulcanize:import-state');
export const main = async (): Promise<any> => { export const main = async (): Promise<any> => {
const argv = await yargs(hideBin(process.argv)).parserConfiguration({ const importStateCmd = new ImportStateCmd();
'parse-numbers': false await importStateCmd.init(Database, Indexer, EventWatcher);
}).options({
configFile: {
alias: 'f',
type: 'string',
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
importFile: {
alias: 'i',
type: 'string',
demandOption: true,
describe: 'Import file path (JSON)'
}
}).argv;
const config: Config = await getConfig(argv.configFile); await importStateCmd.exec(State);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue);
// Import data.
const importFilePath = path.resolve(argv.importFile);
const encodedImportData = fs.readFileSync(importFilePath);
const importData = codec.decode(Buffer.from(encodedImportData)) as any;
// Fill the snapshot block.
await fillBlocks(
jobQueue,
indexer,
eventWatcher,
jobQueueConfig.blockDelayInMilliSecs,
{
prefetch: true,
startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber
}
);
// Fill the Contracts.
for (const contract of importData.contracts) {
await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock);
}
// Get the snapshot block.
const block = await indexer.getBlockProgress(importData.snapshotBlock.blockHash);
assert(block);
// Fill the States.
for (const checkpoint of importData.stateCheckpoints) {
let state = new State();
state = Object.assign(state, checkpoint);
state.block = block;
state.data = Buffer.from(codec.encode(state.data));
state = await indexer.saveOrUpdateState(state);
}
// Mark snapshot block as completely processed.
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber);
await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber);
await indexer.updateStateSyncStatusIndexedBlock(block.blockNumber);
await indexer.updateStateSyncStatusCheckpointBlock(block.blockNumber);
// The 'diff_staged' and 'init' State entries are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeStates(block.blockNumber, StateKind.Init);
await indexer.removeStates(block.blockNumber, StateKind.DiffStaged);
log(`Import completed for snapshot block at height ${block.blockNumber}`);
}; };
main().catch(err => { main().catch(err => {

View File

@ -12,7 +12,7 @@ import {
EventWatcherInterface, EventWatcherInterface,
QUEUE_BLOCK_PROCESSING, QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING, QUEUE_EVENT_PROCESSING,
UpstreamConfig IndexerInterface
} from '@cerc-io/util'; } from '@cerc-io/util';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
@ -25,15 +25,15 @@ export class EventWatcher implements EventWatcherInterface {
_pubsub: PubSub _pubsub: PubSub
_jobQueue: JobQueue _jobQueue: JobQueue
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) { constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
assert(ethClient); assert(ethClient);
assert(indexer); assert(indexer);
this._ethClient = ethClient; this._ethClient = ethClient;
this._indexer = indexer; this._indexer = indexer as Indexer;
this._pubsub = pubsub; this._pubsub = pubsub;
this._jobQueue = jobQueue; this._jobQueue = jobQueue;
this._baseEventWatcher = new BaseEventWatcher(upstreamConfig, this._ethClient, this._indexer, this._pubsub, this._jobQueue); this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
} }
getEventIterator (): AsyncIterator<any> { getEventIterator (): AsyncIterator<any> {

View File

@ -74,7 +74,7 @@ export const main = async (): Promise<any> => {
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub(); const pubsub = new PubSub();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv); await fillBlocks(jobQueue, indexer, eventWatcher, jobQueueConfig.blockDelayInMilliSecs, argv);
}; };

View File

@ -56,7 +56,7 @@ export const main = async (): Promise<any> => {
const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue); const indexer = new Indexer(config.server, db, { ethClient }, ethProvider, jobQueue);
await indexer.init(); await indexer.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, indexer, pubsub, jobQueue); const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) { if (watcherKind === KIND_ACTIVE) {
await jobQueue.start(); await jobQueue.start();

View File

@ -12,7 +12,6 @@ import { JobQueue } from './job-queue';
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types'; import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants'; import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING } from './constants';
import { createPruningJob, processBlockByNumber } from './common'; import { createPruningJob, processBlockByNumber } from './common';
import { UpstreamConfig } from './config';
import { OrderDirection } from './database'; import { OrderDirection } from './database';
const EVENT = 'event'; const EVENT = 'event';
@ -27,10 +26,8 @@ export class EventWatcher {
_subscription?: ZenObservable.Subscription _subscription?: ZenObservable.Subscription
_pubsub: PubSub _pubsub: PubSub
_jobQueue: JobQueue _jobQueue: JobQueue
_upstreamConfig: UpstreamConfig
constructor (upstreamConfig: UpstreamConfig, ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) { constructor (ethClient: EthClient, indexer: IndexerInterface, pubsub: PubSub, jobQueue: JobQueue) {
this._upstreamConfig = upstreamConfig;
this._ethClient = ethClient; this._ethClient = ethClient;
this._indexer = indexer; this._indexer = indexer;
this._pubsub = pubsub; this._pubsub = pubsub;

View File

@ -109,7 +109,7 @@ export interface IndexerInterface {
isWatchedContract: (address: string) => ContractInterface | undefined; isWatchedContract: (address: string) => ContractInterface | undefined;
getContractsByKind?: (kind: string) => ContractInterface[] getContractsByKind?: (kind: string) => ContractInterface[]
cacheContract?: (contract: ContractInterface) => void; cacheContract?: (contract: ContractInterface) => void;
watchContract?: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise<void> watchContract: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise<void>
getEntityTypesMap?: () => Map<string, { [key: string]: string }> getEntityTypesMap?: () => Map<string, { [key: string]: string }>
getRelationsMap?: () => Map<any, { [key: string]: any }> getRelationsMap?: () => Map<any, { [key: string]: any }>
createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void> createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void>
@ -125,6 +125,8 @@ export interface IndexerInterface {
updateStateStatusMap (address: string, stateStatus: StateStatus): void updateStateStatusMap (address: string, stateStatus: StateStatus): void
getStateData (state: StateInterface): any getStateData (state: StateInterface): any
getStateByCID (cid: string): Promise<StateInterface | undefined> getStateByCID (cid: string): Promise<StateInterface | undefined>
saveOrUpdateState (state: StateInterface): Promise<StateInterface>
removeStates (blockNumber: number, kind: StateKind): Promise<void>
resetWatcherToBlock (blockNumber: number): Promise<void> resetWatcherToBlock (blockNumber: number): Promise<void>
getResultEvent (event: EventInterface): any getResultEvent (event: EventInterface): any
} }
@ -138,6 +140,7 @@ export interface EventWatcherInterface {
export interface DatabaseInterface { export interface DatabaseInterface {
_conn: Connection; _conn: Connection;
readonly baseDatabase: Database readonly baseDatabase: Database
readonly graphDatabase?: any
init (): Promise<void>; init (): Promise<void>;
close (): Promise<void>; close (): Promise<void>;
createTransactionRunner (): Promise<QueryRunner>; createTransactionRunner (): Promise<QueryRunner>;