Add watched contract from subgraph yaml on startup (#56)

This commit is contained in:
prathamesh0 2021-11-18 18:03:36 +05:30 committed by nabarun
parent af259a32f0
commit 44b3fd59e8
35 changed files with 314 additions and 218 deletions

View File

@ -6,8 +6,9 @@ import path from 'path';
import yargs from 'yargs';
import 'reflect-metadata';
import debug from 'debug';
import assert from 'assert';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../database';
@ -50,7 +51,16 @@ const main = async (): Promise<void> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -9,7 +9,7 @@ import debug from 'debug';
import fs from 'fs';
import path from 'path';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import * as codec from '@ipld/dag-cbor';
@ -48,7 +48,16 @@ const main = async (): Promise<void> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
@ -59,7 +68,7 @@ const main = async (): Promise<void> => {
ipldCheckpoints: []
};
const contracts = await db.getContracts({});
const contracts = await db.getContracts();
// Get latest canonical block.
const block = await indexer.getLatestCanonicalBlock();

View File

@ -55,10 +55,6 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -69,6 +65,11 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
// Import data.
@ -91,7 +92,7 @@ export const main = async (): Promise<any> => {
// Fill the Contracts.
for (const contract of importData.contracts) {
await db.saveContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock);
await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock);
}
// Get the snapshot block.

View File

@ -9,7 +9,7 @@ import 'reflect-metadata';
import debug from 'debug';
import util from 'util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../database';
@ -48,7 +48,16 @@ const main = async (): Promise<void> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -7,7 +7,7 @@ import debug from 'debug';
import { MoreThan } from 'typeorm';
import assert from 'assert';
import { getConfig, initClients, resetJobs } from '@vulcanize/util';
import { getConfig, initClients, resetJobs, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../../database';
@ -40,7 +40,16 @@ export const handler = async (argv: any): Promise<void> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -6,8 +6,9 @@ import path from 'path';
import yargs from 'yargs';
import 'reflect-metadata';
import debug from 'debug';
import assert from 'assert';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../database';
@ -47,6 +48,7 @@ const main = async (): Promise<void> => {
},
startingBlock: {
type: 'number',
default: 1,
describe: 'Starting block'
}
}).argv;
@ -62,7 +64,16 @@ const main = async (): Promise<void> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -191,15 +191,10 @@ export class Database implements DatabaseInterface {
return repo.save(entity);
}
async getContracts (where: FindConditions<Contract>): Promise<Contract[]> {
const repo = this._conn.getRepository(Contract);
return repo.find({ where });
}
async getContract (address: string): Promise<Contract | undefined> {
async getContracts (): Promise<Contract[]> {
const repo = this._conn.getRepository(Contract);
return this._baseDatabase.getContract(repo, address);
return this._baseDatabase.getContracts(repo);
}
async createTransactionRunner (): Promise<QueryRunner> {
@ -223,10 +218,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.saveEventEntity(repo, entity);
}
async getBlockEvents (blockHash: string, where: FindConditions<Event>): Promise<Event[]> {
async getBlockEvents (blockHash: string, options: FindManyOptions<Event>): Promise<Event[]> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash, where);
return this._baseDatabase.getBlockEvents(repo, blockHash, options);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
@ -236,12 +231,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
}
async saveContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Contract);
async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<Contract> {
const repo = queryRunner.manager.getRepository(Contract);
return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock);
});
return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock);
}
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
@ -291,10 +284,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgress(repo, blockHash);
}
async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex);
return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex);
}
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void> {

View File

@ -93,16 +93,22 @@ export class EventWatcher implements EventWatcherInterface {
return;
}
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
// Cannot publish individual event as they are processed together in a single job.
// TODO: Use a different pubsub to publish event from job-runner.
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
}
}
}
});

View File

@ -53,14 +53,6 @@ export const main = async (): Promise<any> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -70,6 +62,15 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv);

View File

@ -16,7 +16,7 @@ import { BaseProvider } from '@ethersproject/providers';
import * as codec from '@ipld/dag-cbor';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { StorageLayout } from '@vulcanize/solidity-mapper';
import { EventInterface, Indexer as BaseIndexer, IndexerInterface, UNKNOWN_EVENT_NAME, ServerConfig } from '@vulcanize/util';
import { EventInterface, Indexer as BaseIndexer, IndexerInterface, UNKNOWN_EVENT_NAME, ServerConfig, JobQueue } from '@vulcanize/util';
import { GraphWatcher } from '@vulcanize/graph-node';
import { Database } from './database';
@ -117,7 +117,7 @@ export class Indexer implements IndexerInterface {
_ipfsClient: IPFSClient
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, graphWatcher: GraphWatcher) {
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) {
assert(db);
assert(ethClient);
assert(postgraphileClient);
@ -127,7 +127,7 @@ export class Indexer implements IndexerInterface {
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._serverConfig = serverConfig;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue);
this._graphWatcher = graphWatcher;
this._abiMap = new Map();
@ -281,7 +281,7 @@ export class Indexer implements IndexerInterface {
const { data: { blockHash, blockNumber } } = job;
// Get all the contracts.
const contracts = await this._db.getContracts({});
const contracts = await this._db.getContracts();
// For each contract, merge the diff till now to create a checkpoint.
for (const contract of contracts) {
@ -976,24 +976,6 @@ export class Indexer implements IndexerInterface {
};
}
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock?: number): Promise<boolean> {
// Use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress) if input to address is a contract address.
// If a contract identifier is passed as address instead, no need to convert to checksum address.
// Customize: use the kind input to filter out non-contract-address input to address.
const formattedAddress = (kind === '__protocol__') ? address : ethers.utils.getAddress(address);
if (!startingBlock) {
const syncStatus = await this.getSyncStatus();
assert(syncStatus);
startingBlock = syncStatus.latestIndexedBlockNumber;
}
await this._db.saveContract(formattedAddress, kind, checkpoint, startingBlock);
return true;
}
async getHookStatus (): Promise<HookStatus | undefined> {
const dbTx = await this._db.createTransactionRunner();
let res;
@ -1035,6 +1017,14 @@ export class Indexer implements IndexerInterface {
return this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
}
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock);
}
async saveEventEntity (dbEvent: Event): Promise<Event> {
return this._baseIndexer.saveEventEntity(dbEvent);
}
async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
@ -1055,6 +1045,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getSyncStatus();
}
async getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any> {
return this._baseIndexer.getBlocks(blockFilter);
}
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
}
@ -1067,10 +1061,6 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force);
}
async getBlock (blockHash: string): Promise<any> {
return this._baseIndexer.getBlock(blockHash);
}
async getEvent (id: string): Promise<Event | undefined> {
return this._baseIndexer.getEvent(id);
}
@ -1099,8 +1089,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.markBlocksAsPruned(blocks);
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex);
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {

View File

@ -63,21 +63,12 @@ export class JobRunner {
if (kind === JOB_KIND_INDEX) {
await this._indexer.processBlock(blockHash);
}
await this._jobQueue.markComplete(job);
});
}
async subscribeEventProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const event = await this._baseJobRunner.processEvent(job);
const watchedContract = await this._indexer.isWatchedContract(event.contract);
if (watchedContract) {
await this._indexer.processEvent(event);
}
await this._jobQueue.markComplete(job);
await this._baseJobRunner.processEvent(job);
});
}
@ -141,14 +132,6 @@ export const main = async (): Promise<any> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
// Watching all the contracts in the subgraph.
await graphWatcher.addContracts();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -158,6 +141,14 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
// Watching all the contracts in the subgraph.
await graphWatcher.addContracts();
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start();
};

View File

@ -51,10 +51,11 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
},
Mutation: {
watchContract: (_: any, { address, kind, checkpoint, startingBlock }: { address: string, kind: string, checkpoint: boolean, startingBlock: number }): Promise<boolean> => {
watchContract: async (_: any, { address, kind, checkpoint, startingBlock = 1 }: { address: string, kind: string, checkpoint: boolean, startingBlock: number }): Promise<boolean> => {
log('watchContract', address, kind, checkpoint, startingBlock);
await indexer.watchContract(address, kind, checkpoint, startingBlock);
return indexer.watchContract(address, kind, checkpoint, startingBlock);
return true;
}
},

View File

@ -51,10 +51,6 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -64,6 +60,11 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) {

View File

@ -10,6 +10,7 @@ import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@
import { Database } from '../database';
import { Indexer } from '../indexer';
import { CONTRACT_KIND } from '../utils/index';
(async () => {
const argv = await yargs.parserConfiguration({
@ -60,7 +61,7 @@ import { Indexer } from '../indexer';
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode);
await indexer.watchContract(argv.address, argv.checkpoint, argv.startingBlock);
await indexer.watchContract(argv.address, CONTRACT_KIND, argv.checkpoint, argv.startingBlock);
await db.close();
await jobQueue.stop();

View File

@ -29,8 +29,6 @@ const ETH_CALL_MODE = 'eth_call';
const TRANSFER_EVENT = 'Transfer';
const APPROVAL_EVENT = 'Approval';
const CONTRACT_KIND = 'token';
interface EventResult {
event: {
from?: string;
@ -301,8 +299,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.isWatchedContract(address);
}
async watchContract (address: string, checkpoint: boolean, startingBlock: number): Promise<void> {
return this._baseIndexer.watchContract(address, CONTRACT_KIND, checkpoint, startingBlock);
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock);
}
async saveEventEntity (dbEvent: Event): Promise<Event> {

View File

@ -10,6 +10,7 @@ import { ValueResult } from '@vulcanize/util';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
import { CONTRACT_KIND } from './utils/index';
const log = debug('vulcanize:resolver');
@ -36,7 +37,7 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
Mutation: {
watchToken: async (_: any, { token, checkpoint = false, startingBlock = 1 }: { token: string, checkpoint: boolean, startingBlock: number }): Promise<boolean> => {
log('watchToken', token, checkpoint, startingBlock);
await indexer.watchContract(token, checkpoint, startingBlock);
await indexer.watchContract(token, CONTRACT_KIND, checkpoint, startingBlock);
return true;
}

View File

@ -10,6 +10,8 @@ import ERC20SymbolBytesABI from '../artifacts/ERC20SymbolBytes.json';
import ERC20NameBytesABI from '../artifacts/ERC20NameBytes.json';
import { StaticTokenDefinition } from './static-token-definition';
export const CONTRACT_KIND = 'token';
export const fetchTokenSymbol = async (ethProvider: BaseProvider, blockHash: string, tokenAddress: string): Promise<string> => {
const contract = new Contract(tokenAddress, abi, ethProvider);
const contractSymbolBytes = new Contract(tokenAddress, ERC20SymbolBytesABI, ethProvider);

View File

@ -8,6 +8,7 @@ dataSources:
source:
address: ""
abi: Example1
startBlock: 100
mapping:
kind: ethereum/events
apiVersion: 0.0.5

View File

@ -6,8 +6,9 @@ import path from 'path';
import yargs from 'yargs';
import 'reflect-metadata';
import debug from 'debug';
import assert from 'assert';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../database';
@ -50,7 +51,16 @@ const main = async (): Promise<void> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -9,7 +9,7 @@ import debug from 'debug';
import fs from 'fs';
import path from 'path';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import * as codec from '@ipld/dag-cbor';
@ -48,7 +48,16 @@ const main = async (): Promise<void> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
@ -59,7 +68,7 @@ const main = async (): Promise<void> => {
ipldCheckpoints: []
};
const contracts = await db.getContracts({});
const contracts = await db.getContracts();
// Get latest canonical block.
const block = await indexer.getLatestCanonicalBlock();

View File

@ -55,10 +55,6 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -69,6 +65,11 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
// Import data.
@ -91,7 +92,7 @@ export const main = async (): Promise<any> => {
// Fill the Contracts.
for (const contract of importData.contracts) {
await db.saveContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock);
await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock);
}
// Get the snapshot block.

View File

@ -9,7 +9,7 @@ import 'reflect-metadata';
import debug from 'debug';
import util from 'util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../database';
@ -48,7 +48,16 @@ const main = async (): Promise<void> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -7,7 +7,7 @@ import debug from 'debug';
import { MoreThan } from 'typeorm';
import assert from 'assert';
import { getConfig, initClients, resetJobs } from '@vulcanize/util';
import { getConfig, initClients, resetJobs, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../../database';
@ -43,7 +43,16 @@ export const handler = async (argv: any): Promise<void> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -6,8 +6,9 @@ import path from 'path';
import yargs from 'yargs';
import 'reflect-metadata';
import debug from 'debug';
import assert from 'assert';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../database';
@ -47,6 +48,7 @@ const main = async (): Promise<void> => {
},
startingBlock: {
type: 'number',
default: 1,
describe: 'Starting block'
}
}).argv;
@ -62,7 +64,16 @@ const main = async (): Promise<void> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();

View File

@ -225,15 +225,10 @@ export class Database implements DatabaseInterface {
return repo.save(entity);
}
async getContracts (where: FindConditions<Contract>): Promise<Contract[]> {
const repo = this._conn.getRepository(Contract);
return repo.find({ where });
}
async getContract (address: string): Promise<Contract | undefined> {
async getContracts (): Promise<Contract[]> {
const repo = this._conn.getRepository(Contract);
return this._baseDatabase.getContract(repo, address);
return this._baseDatabase.getContracts(repo);
}
async createTransactionRunner (): Promise<QueryRunner> {
@ -257,10 +252,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.saveEventEntity(repo, entity);
}
async getBlockEvents (blockHash: string, where: FindConditions<Event>): Promise<Event[]> {
async getBlockEvents (blockHash: string, options: FindManyOptions<Event>): Promise<Event[]> {
const repo = this._conn.getRepository(Event);
return this._baseDatabase.getBlockEvents(repo, blockHash, where);
return this._baseDatabase.getBlockEvents(repo, blockHash, options);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
@ -270,12 +265,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
}
async saveContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Contract);
async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<Contract> {
const repo = queryRunner.manager.getRepository(Contract);
return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock);
});
return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock);
}
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
@ -325,10 +318,10 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.getBlockProgress(repo, blockHash);
}
async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void> {
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex);
return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex);
}
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void> {

View File

@ -93,16 +93,22 @@ export class EventWatcher implements EventWatcherInterface {
return;
}
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
// Cannot publish individual event as they are processed together in a single job.
// TODO: Use a different pubsub to publish event from job-runner.
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) {
// Check for max acceptable lag time between request and sending results to live subscribers.
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
} else {
log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
}
}
}
});

View File

@ -53,14 +53,6 @@ export const main = async (): Promise<any> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -70,6 +62,15 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv);

View File

@ -16,7 +16,7 @@ import { BaseProvider } from '@ethersproject/providers';
import * as codec from '@ipld/dag-cbor';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { StorageLayout } from '@vulcanize/solidity-mapper';
import { EventInterface, Indexer as BaseIndexer, IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, updateStateForElementaryType } from '@vulcanize/util';
import { EventInterface, Indexer as BaseIndexer, IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, updateStateForElementaryType, JobQueue } from '@vulcanize/util';
import { GraphWatcher } from '@vulcanize/graph-node';
import { Database } from './database';
@ -87,7 +87,7 @@ export class Indexer implements IndexerInterface {
_ipfsClient: IPFSClient
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, graphWatcher: GraphWatcher) {
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) {
assert(db);
assert(ethClient);
assert(postgraphileClient);
@ -97,7 +97,7 @@ export class Indexer implements IndexerInterface {
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._serverConfig = serverConfig;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider);
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue);
this._graphWatcher = graphWatcher;
const { abi, storageLayout } = artifacts;
@ -271,8 +271,10 @@ export class Indexer implements IndexerInterface {
const checkpoint = await this.getLatestIPLDBlock(contractAddress, 'checkpoint');
// There should be an initial checkpoint at least.
// Assumption: There should be no events for the contract at the starting block.
assert(checkpoint, 'Initial checkpoint doesn\'t exist');
// Return if initial checkpoint doesn't exist.
if (!checkpoint) {
return;
}
// Check if the latest checkpoint is in the same block.
assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash.');
@ -289,7 +291,7 @@ export class Indexer implements IndexerInterface {
const { data: { blockHash, blockNumber } } = job;
// Get all the contracts.
const contracts = await this._db.getContracts({});
const contracts = await this._db.getContracts();
// For each contract, merge the diff till now to create a checkpoint.
for (const contract of contracts) {
@ -299,7 +301,7 @@ export class Indexer implements IndexerInterface {
const checkpointBlock = await this.getLatestIPLDBlock(contract.address, 'checkpoint');
if (!checkpointBlock) {
if (blockNumber === contract.startingBlock) {
if (blockNumber >= contract.startingBlock) {
// Call initial checkpoint hook.
await createInitialCheckpoint(this, contract.address, blockHash);
}
@ -585,24 +587,6 @@ export class Indexer implements IndexerInterface {
};
}
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock?: number): Promise<boolean> {
// Use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress) if input to address is a contract address.
// If a contract identifier is passed as address instead, no need to convert to checksum address.
// Customize: use the kind input to filter out non-contract-address input to address.
const formattedAddress = (kind === '__protocol__') ? address : ethers.utils.getAddress(address);
if (!startingBlock) {
const syncStatus = await this.getSyncStatus();
assert(syncStatus);
startingBlock = syncStatus.latestIndexedBlockNumber;
}
await this._db.saveContract(formattedAddress, kind, checkpoint, startingBlock);
return true;
}
async getHookStatus (): Promise<HookStatus | undefined> {
const dbTx = await this._db.createTransactionRunner();
let res;
@ -644,6 +628,14 @@ export class Indexer implements IndexerInterface {
return this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
}
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock);
}
async saveEventEntity (dbEvent: Event): Promise<Event> {
return this._baseIndexer.saveEventEntity(dbEvent);
}
async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
@ -664,6 +656,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getSyncStatus();
}
async getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any> {
return this._baseIndexer.getBlocks(blockFilter);
}
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
}
@ -676,10 +672,6 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force);
}
async getBlock (blockHash: string): Promise<any> {
return this._baseIndexer.getBlock(blockHash);
}
async getEvent (id: string): Promise<Event | undefined> {
return this._baseIndexer.getEvent(id);
}
@ -708,8 +700,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.markBlocksAsPruned(blocks);
}
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex);
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
}
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {

View File

@ -63,21 +63,12 @@ export class JobRunner {
if (kind === JOB_KIND_INDEX) {
await this._indexer.processBlock(blockHash);
}
await this._jobQueue.markComplete(job);
});
}
async subscribeEventProcessingQueue (): Promise<void> {
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
const event = await this._baseJobRunner.processEvent(job);
const watchedContract = await this._indexer.isWatchedContract(event.contract);
if (watchedContract) {
await this._indexer.processEvent(event);
}
await this._jobQueue.markComplete(job);
await this._baseJobRunner.processEvent(job);
});
}
@ -141,11 +132,6 @@ export const main = async (): Promise<any> => {
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath);
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -155,6 +141,14 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
// Watching all the contracts in the subgraph.
await graphWatcher.addContracts();
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
await jobRunner.start();
};

View File

@ -12,6 +12,7 @@ import { Indexer } from './indexer';
import { EventWatcher } from './events';
import { ExampleEntity } from './entity/ExampleEntity';
import { RelatedEntity } from './entity/RelatedEntity';
const log = debug('vulcanize:resolver');
@ -36,10 +37,11 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
},
Mutation: {
watchContract: (_: any, { address, kind, checkpoint, startingBlock }: { address: string, kind: string, checkpoint: boolean, startingBlock: number }): Promise<boolean> => {
watchContract: async (_: any, { address, kind, checkpoint, startingBlock = 1 }: { address: string, kind: string, checkpoint: boolean, startingBlock: number }): Promise<boolean> => {
log('watchContract', address, kind, checkpoint, startingBlock);
await indexer.watchContract(address, kind, checkpoint, startingBlock);
return indexer.watchContract(address, kind, checkpoint, startingBlock);
return true;
}
},
@ -54,6 +56,12 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
return indexer._test(blockHash, contractAddress);
},
relatedEntity: async (_: any, { id, blockHash }: { id: string, blockHash: string }): Promise<RelatedEntity | undefined> => {
log('relatedEntity', id, blockHash);
return indexer.getSubgraphEntity(RelatedEntity, id, blockHash);
},
exampleEntity: async (_: any, { id, blockHash }: { id: string, blockHash: string }): Promise<ExampleEntity | undefined> => {
log('exampleEntity', id, blockHash);

View File

@ -72,6 +72,7 @@ type Query {
eventsInRange(fromBlockNumber: Int!, toBlockNumber: Int!): [ResultEvent!]
getMethod(blockHash: String!, contractAddress: String!): ResultString!
_test(blockHash: String!, contractAddress: String!): ResultBigInt!
relatedEntity(id: String!, blockHash: String!): RelatedEntity!
exampleEntity(id: String!, blockHash: String!): ExampleEntity!
getStateByCID(cid: String!): ResultIPLDBlock
getState(blockHash: String!, contractAddress: String!, kind: String): ResultIPLDBlock
@ -82,6 +83,13 @@ enum EnumType {
choice2
}
type RelatedEntity {
id: ID!
paramBigInt: BigInt!
examples: [ExampleEntity!]!
bigIntArray: [BigInt!]!
}
type ExampleEntity {
id: ID!
count: BigInt!
@ -91,7 +99,7 @@ type ExampleEntity {
paramBytes: Bytes!
paramEnum: EnumType!
paramBigDecimal: BigDecimal!
related: String!
related: RelatedEntity!
}
type Mutation {

View File

@ -51,10 +51,6 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
@ -64,6 +60,11 @@ export const main = async (): Promise<any> => {
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher);
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue);
if (watcherKind === KIND_ACTIVE) {

View File

@ -312,8 +312,10 @@ export class Indexer {
assert(this._db.saveContract);
const dbTx = await this._db.createTransactionRunner();
// Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress).
const contractAddress = ethers.utils.getAddress(address);
// Use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress) if input to address is a contract address.
// If a contract identifier is passed as address instead, no need to convert to checksum address.
// Customize: use the kind input to filter out non-contract-address input to address.
const contractAddress = (kind === '__protocol__') ? address : ethers.utils.getAddress(address);
try {
const contract = await this._db.saveContract(dbTx, contractAddress, kind, checkpoint, startingBlock);

View File

@ -73,7 +73,7 @@ export interface IndexerInterface {
isWatchedContract?: (address: string) => Promise<ContractInterface | undefined>;
cacheContract?: (contract: ContractInterface) => void;
createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void>
watchContract?: (address: string, kind: string, checkpoint: boolean, startingBlock?: number) => Promise<boolean>
watchContract?: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise<void>
}
export interface EventWatcherInterface {

View File

@ -4209,6 +4209,11 @@ binary-extensions@^2.0.0:
resolved "https://registry.yarnpkg.com/binary-extensions/-/binary-extensions-2.2.0.tgz#75f502eeaf9ffde42fc98829645be4ea76bd9e2d"
integrity sha512-jDctJ/IVQbZoJykoeHbhXpOlNBqGNcwXJKJog42E5HDPUwQTSdjCHdihjj0DlnheQ7blbT6dHOafNAiS8ooQKA==
binaryen@101.0.0-nightly.20210723:
version "101.0.0-nightly.20210723"
resolved "https://registry.yarnpkg.com/binaryen/-/binaryen-101.0.0-nightly.20210723.tgz#b6bb7f3501341727681a03866c0856500eec3740"
integrity sha512-eioJNqhHlkguVSbblHOtLqlhtC882SOEPKmNFZaDuz1hzQjolxZ+eu3/kaS10n3sGPONsIZsO7R9fR00UyhEUA==
bindings@^1.2.1:
version "1.5.0"
resolved "https://registry.yarnpkg.com/bindings/-/bindings-1.5.0.tgz#10353c9e945334bc0511a6d90b38fbc7c9c504df"
@ -4216,11 +4221,6 @@ bindings@^1.2.1:
dependencies:
file-uri-to-path "1.0.0"
binaryen@101.0.0-nightly.20210723:
version "101.0.0-nightly.20210723"
resolved "https://registry.yarnpkg.com/binaryen/-/binaryen-101.0.0-nightly.20210723.tgz#b6bb7f3501341727681a03866c0856500eec3740"
integrity sha512-eioJNqhHlkguVSbblHOtLqlhtC882SOEPKmNFZaDuz1hzQjolxZ+eu3/kaS10n3sGPONsIZsO7R9fR00UyhEUA==
bip39@2.5.0:
version "2.5.0"
resolved "https://registry.yarnpkg.com/bip39/-/bip39-2.5.0.tgz#51cbd5179460504a63ea3c000db3f787ca051235"
@ -10646,6 +10646,13 @@ node-fetch@^2:
resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.2.tgz#986996818b73785e47b1965cc34eb093a1d464d0"
integrity sha512-aLoxToI6RfZ+0NOjmWAgn9+LEd30YCkJKFSyWacNZdEKTit/ZMcKjGkTRo8uWEsnIb/hfKecNPEbln02PdWbcA==
node-fetch@^2.6.5:
version "2.6.6"
resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.6.tgz#1751a7c01834e8e1697758732e9efb6eeadfaf89"
integrity sha512-Z8/6vRlTUChSdIgMa51jxQ4lrw/Jy5SOW10ObaA47/RElsAN2c5Pn8bTgFGWn/ibwzXTE8qwr1Yzx28vsecXEA==
dependencies:
whatwg-url "^5.0.0"
"node-fetch@https://registry.npmjs.org/@achingbrain/node-fetch/-/node-fetch-2.6.7.tgz":
version "2.6.7"
resolved "https://registry.npmjs.org/@achingbrain/node-fetch/-/node-fetch-2.6.7.tgz#1b5d62978f2ed07b99444f64f0df39f960a6d34d"
@ -11863,7 +11870,7 @@ proto-list@~1.2.1:
resolved "https://registry.yarnpkg.com/proto-list/-/proto-list-1.2.4.tgz#212d5bfe1318306a420f6402b8e26ff39647a849"
integrity sha1-IS1b/hMYMGpCD2QCuOJv85ZHqEk=
protobufjs@^6.10.2:
protobufjs@^6.10.2, protobufjs@~6.11.0:
version "6.11.2"
resolved "https://registry.yarnpkg.com/protobufjs/-/protobufjs-6.11.2.tgz#de39fabd4ed32beaa08e9bb1e30d08544c1edf8b"
integrity sha512-4BQJoPooKJl2G9j3XftkIXjoC9C0Av2NOrWmbLWT1vH32GcSUHjM0Arra6UfTsVyfMAuFzaLucXn1sadxJydAw==