Handle missing blocks in postgraphile by making RPC call (#296)

This commit is contained in:
nikugogoi 2021-12-03 16:23:11 +05:30 committed by GitHub
parent 32fea1f2cb
commit 63ce6fd55f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 94 additions and 76 deletions

View File

@ -69,7 +69,7 @@ export class Indexer {
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider);
this._baseIndexer = new BaseIndexer(this._db, this._postgraphileClient, this._ethProvider);
const { abi, storageLayout } = artifacts;
@ -248,8 +248,8 @@ export class Indexer {
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
}
async getBlock (blockHash: string): Promise<any> {
return this._baseIndexer.getBlock(blockHash);
async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise<any> {
return this._baseIndexer.getBlocks(blockFilter);
}
async getEvent (id: string): Promise<Event | undefined> {

View File

@ -29,13 +29,13 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { dbConfig, serverConfig, ethClient, ethProvider } = await getResetConfig(config);
const { dbConfig, serverConfig, ethClient, postgraphileClient, ethProvider } = await getResetConfig(config);
// Initialize database.
const db = new Database(dbConfig);
await db.init();
const indexer = new Indexer(db, ethClient, ethProvider, serverConfig.mode);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, serverConfig.mode);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -77,7 +77,7 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, ethProvider, mode);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode);
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');

View File

@ -44,6 +44,7 @@ interface EventResult {
export class Indexer {
_db: Database
_ethClient: EthClient
_postgraphileClient: EthClient
_ethProvider: BaseProvider
_baseIndexer: BaseIndexer
@ -52,15 +53,16 @@ export class Indexer {
_contract: ethers.utils.Interface
_serverMode: string
constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider, serverMode: string) {
constructor (db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, serverMode: string) {
assert(db);
assert(ethClient);
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._serverMode = serverMode;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider);
this._baseIndexer = new BaseIndexer(this._db, this._postgraphileClient, this._ethProvider);
const { abi, storageLayout } = artifacts;
@ -331,8 +333,8 @@ export class Indexer {
return this._baseIndexer.getSyncStatus();
}
async getBlock (blockHash: string): Promise<any> {
return this._baseIndexer.getBlock(blockHash);
async getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any> {
return this._baseIndexer.getBlocks(blockFilter);
}
async getEvent (id: string): Promise<Event | undefined> {

View File

@ -101,8 +101,13 @@ export const main = async (): Promise<any> => {
cache
});
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
const ethProvider = getCustomProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, ethClient, ethProvider, mode);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -72,7 +72,7 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, ethClient, ethProvider, mode);
const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -72,8 +72,14 @@ export class EthClient {
);
}
async getBlocksByNumber (blockNumber: number): Promise<any> {
return this._graphqlClient.query(ethQueries.getBlocksByNumber, { blockNumber });
async getBlocks ({ blockNumber, blockHash }: { blockNumber?: number, blockHash?: string }): Promise<any> {
return this._graphqlClient.query(
ethQueries.getBlocks,
{
blockNumber,
blockHash
}
);
}
async getBlockByHash (blockHash?: string): Promise<any> {

View File

@ -64,15 +64,19 @@ query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) {
}
`;
export const getBlocksByNumber = gql`
query allEthHeaderCids($blockNumber: BigInt) {
allEthHeaderCids(condition: { blockNumber: $blockNumber }) {
export const getBlocks = gql`
query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) {
allEthHeaderCids(condition: { blockNumber: $blockNumber, blockHash: $blockHash }) {
nodes {
cid
blockNumber
blockHash
parentHash
timestamp
stateRoot
td
txRoot
receiptRoot
}
}
}
@ -127,7 +131,7 @@ export default {
getStorageAt,
getLogs,
getBlockWithTransactions,
getBlocksByNumber,
getBlocks,
getBlockByHash,
subscribeBlocks,
subscribeTransactions

View File

@ -38,7 +38,7 @@ export const builder = {
export const handler = async (argv: any): Promise<void> => {
const config = await getConfig(argv.configFile);
await resetJobs(config);
const { dbConfig, serverConfig, upstreamConfig, ethClient, ethProvider } = await getResetConfig(config);
const { dbConfig, serverConfig, upstreamConfig, postgraphileClient, ethProvider } = await getResetConfig(config);
// Initialize database.
const db = new Database(dbConfig);
@ -52,7 +52,7 @@ export const handler = async (argv: any): Promise<void> => {
const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, serverConfig.mode);
const indexer = new Indexer(db, uniClient, erc20Client, postgraphileClient, ethProvider, serverConfig.mode);
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -81,7 +81,7 @@ export const main = async (): Promise<any> => {
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, mode);
const indexer = new Indexer(db, uniClient, erc20Client, postgraphileClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;

View File

@ -42,21 +42,21 @@ export class Indexer implements IndexerInterface {
_db: Database
_uniClient: UniClient
_erc20Client: ERC20Client
_ethClient: EthClient
_postgraphileClient: EthClient
_baseIndexer: BaseIndexer
_isDemo: boolean
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, ethClient: EthClient, ethProvider: providers.BaseProvider, mode: string) {
constructor (db: Database, uniClient: UniClient, erc20Client: ERC20Client, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, mode: string) {
assert(db);
assert(uniClient);
assert(erc20Client);
assert(ethClient);
assert(postgraphileClient);
this._db = db;
this._uniClient = uniClient;
this._erc20Client = erc20Client;
this._ethClient = ethClient;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, ethProvider);
this._postgraphileClient = postgraphileClient;
this._baseIndexer = new BaseIndexer(this._db, this._postgraphileClient, ethProvider);
this._isDemo = mode === 'demo';
}
@ -147,7 +147,7 @@ export class Indexer implements IndexerInterface {
log('Event processing completed for', eventName);
}
async getBlocks (where: { [key: string]: any } = {}, queryOptions: QueryOptions): Promise<any> {
async getBlockEntities (where: { [key: string]: any } = {}, queryOptions: QueryOptions): Promise<any> {
if (where.timestamp_gt) {
where.blockTimestamp_gt = where.timestamp_gt;
delete where.timestamp_gt;
@ -327,8 +327,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getSyncStatus();
}
async getBlock (blockHash: string): Promise<any> {
return this._baseIndexer.getBlock(blockHash);
async getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any> {
return this._baseIndexer.getBlocks(blockFilter);
}
async getEvent (id: string): Promise<Event | undefined> {

View File

@ -101,7 +101,6 @@ export const main = async (): Promise<any> => {
tokenWatcher,
cache: cacheConfig,
ethServer: {
gqlApiEndpoint,
gqlPostgraphileEndpoint,
rpcProviderEndpoint
}
@ -111,9 +110,9 @@ export const main = async (): Promise<any> => {
assert(gqlSubscriptionEndpoint, 'Missing upstream uniWatcher.gqlSubscriptionEndpoint');
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({
gqlEndpoint: gqlApiEndpoint,
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
const postgraphileClient = new EthClient({
gqlEndpoint: gqlPostgraphileEndpoint,
cache
});
@ -125,7 +124,7 @@ export const main = async (): Promise<any> => {
const erc20Client = new ERC20Client(tokenWatcher);
const ethProvider = getCustomProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, mode);
const indexer = new Indexer(db, uniClient, erc20Client, postgraphileClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -238,7 +238,7 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
blocks: async (_: any, { first, orderBy, orderDirection, where }: { first: number, orderBy: string, orderDirection: OrderDirection, where: { [key: string]: any } }) => {
log('blocks', first, orderBy, orderDirection, where);
return indexer.getBlocks(where, { limit: first, orderBy, orderDirection });
return indexer.getBlockEntities(where, { limit: first, orderBy, orderDirection });
},
indexingStatusForCurrentVersion: async (_: any, { subgraphName }: { subgraphName: string }) => {

View File

@ -82,7 +82,7 @@ export const main = async (): Promise<any> => {
const uniClient = new UniClient(uniWatcher);
const erc20Client = new ERC20Client(tokenWatcher);
const ethProvider = getCustomProvider(rpcProviderEndpoint);
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, ethProvider, mode);
const indexer = new Indexer(db, uniClient, erc20Client, postgraphileClient, ethProvider, mode);
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -51,7 +51,7 @@ export class Indexer implements IndexerInterface {
this._ethClient = ethClient;
this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider);
this._baseIndexer = new BaseIndexer(this._db, this._postgraphileClient, this._ethProvider);
this._factoryContract = new ethers.utils.Interface(factoryABI);
this._poolContract = new ethers.utils.Interface(poolABI);
@ -384,8 +384,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getSyncStatus();
}
async getBlock (blockHash: string): Promise<any> {
return this._baseIndexer.getBlock(blockHash);
async getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any> {
return this._baseIndexer.getBlocks(blockFilter);
}
async getEvent (id: string): Promise<Event | undefined> {

View File

@ -1,7 +1,5 @@
import debug from 'debug';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { JOB_KIND_PRUNE, QUEUE_BLOCK_PROCESSING, JOB_KIND_INDEX } from './constants';
import { JobQueue } from './job-queue';
import { IndexerInterface } from './types';
@ -43,19 +41,17 @@ export const createPruningJob = async (jobQueue: JobQueue, latestCanonicalBlockN
export const processBlockByNumber = async (
jobQueue: JobQueue,
indexer: IndexerInterface,
ethClient: EthClient,
blockDelayInMilliSecs: number,
blockNumber: number
): Promise<void> => {
log(`Process block ${blockNumber}`);
while (true) {
const result = await ethClient.getBlocksByNumber(blockNumber);
const { allEthHeaderCids: { nodes: blockNodes } } = result;
const blocks = await indexer.getBlocks({ blockNumber });
if (blockNodes.length) {
for (let bi = 0; bi < blockNodes.length; bi++) {
const { blockHash, blockNumber, parentHash, timestamp } = blockNodes[bi];
if (blocks.length) {
for (let bi = 0; bi < blocks.length; bi++) {
const { blockHash, blockNumber, parentHash, timestamp } = blocks[bi];
const blockProgress = await indexer.getBlockProgress(blockHash);
if (blockProgress) {

View File

@ -61,7 +61,7 @@ export class EventWatcher {
const { ethServer: { blockDelayInMilliSecs } } = this._upstreamConfig;
processBlockByNumber(this._jobQueue, this._indexer, this._postgraphileClient, blockDelayInMilliSecs, blockNumber + 1);
processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, blockNumber + 1);
// Creating an AsyncIterable from AsyncIterator to iterate over the values.
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
@ -76,7 +76,7 @@ export class EventWatcher {
const { onBlockProgressEvent: { blockNumber, isComplete } } = data;
if (isComplete) {
processBlockByNumber(this._jobQueue, this._indexer, this._postgraphileClient, blockDelayInMilliSecs, blockNumber + 1);
processBlockByNumber(this._jobQueue, this._indexer, blockDelayInMilliSecs, blockNumber + 1);
}
}
}

View File

@ -34,7 +34,7 @@ export const fillBlocks = async (
currentBlockNumber = syncStatus.latestIndexedBlockNumber + 1;
}
processBlockByNumber(jobQueue, indexer, ethClient, blockDelayInMilliSecs, currentBlockNumber);
processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, currentBlockNumber);
// Creating an AsyncIterable from AsyncIterator to iterate over the values.
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
@ -55,7 +55,7 @@ export const fillBlocks = async (
}
currentBlockNumber++;
processBlockByNumber(jobQueue, indexer, ethClient, blockDelayInMilliSecs, currentBlockNumber);
processBlockByNumber(jobQueue, indexer, blockDelayInMilliSecs, currentBlockNumber);
}
}
};

View File

@ -14,7 +14,6 @@ import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusIn
import { UNKNOWN_EVENT_NAME } from './constants';
const MAX_EVENTS_BLOCK_RANGE = 1000;
const MISSING_BLOCKS_ERROR = 'sql: no rows in result set';
const log = debug('vulcanize:indexer');
@ -27,15 +26,15 @@ export interface ValueResult {
export class Indexer {
_db: DatabaseInterface;
_ethClient: EthClient;
_postgraphileClient: EthClient;
_getStorageAt: GetStorageAt;
_ethProvider: ethers.providers.BaseProvider;
constructor (db: DatabaseInterface, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider) {
this._db = db;
this._ethClient = ethClient;
this._postgraphileClient = ethClient;
this._ethProvider = ethProvider;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
this._getStorageAt = this._postgraphileClient.getStorageAt.bind(this._postgraphileClient);
}
async getSyncStatus (): Promise<SyncStatusInterface | undefined> {
@ -106,29 +105,27 @@ export class Indexer {
return res;
}
async getBlock (blockHash: string): Promise<any> {
try {
const { block } = await this._ethClient.getBlockByHash(blockHash);
return block;
} catch (error) {
// If block is not present in header_cids, eth_getBlockByHash call is made to update statediff.
if (error instanceof Error && error.message === MISSING_BLOCKS_ERROR) {
try {
await this._ethProvider.getBlock(blockHash);
} catch (error: any) {
// eth_getBlockByHash will update statediff but takes some time.
// The block is not returned immediately and an error is thrown so that it is fetched in the next job retry.
if (error.code === ethers.utils.Logger.errors.SERVER_ERROR) {
throw new Error('Block not found');
}
async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise<any> {
assert(blockFilter.blockHash || blockFilter.blockNumber);
const result = await this._postgraphileClient.getBlocks(blockFilter);
const { allEthHeaderCids: { nodes: blocks } } = result;
if (!blocks.length) {
try {
const blockHashOrNumber = blockFilter.blockHash || blockFilter.blockNumber as string | number;
await this._ethProvider.getBlock(blockHashOrNumber);
} catch (error: any) {
// eth_getBlockByHash will update statediff but takes some time.
// The block is not returned immediately and an error is thrown so that it is fetched in the next job retry.
if (error.code !== ethers.utils.Logger.errors.SERVER_ERROR) {
throw error;
}
}
throw error;
log('Block not found. Fetching block after eth_call.');
}
}
return blocks;
}
async getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined> {

View File

@ -131,7 +131,16 @@ export class JobRunner {
const parent = await this._indexer.getBlockProgress(parentHash);
if (!parent) {
const { number: parentBlockNumber, parent: grandparent, timestamp: parentTimestamp } = await this._indexer.getBlock(parentHash);
const blocks = await this._indexer.getBlocks({ blockHash: parentHash });
if (!blocks.length) {
const message = `No blocks at parentHash ${parentHash}, aborting`;
log(message);
throw new Error(message);
}
const [{ blockNumber: parentBlockNumber, parentHash: grandparentHash, timestamp: parentTimestamp }] = blocks;
// Create a higher priority job to index parent block and then abort.
// We don't have to worry about aborting as this job will get retried later.
@ -140,7 +149,7 @@ export class JobRunner {
kind: JOB_KIND_INDEX,
blockHash: parentHash,
blockNumber: parentBlockNumber,
parentHash: grandparent?.hash,
parentHash: grandparentHash,
timestamp: parentTimestamp,
priority: newPriority
}, { priority: newPriority });

View File

@ -50,7 +50,7 @@ export interface IndexerInterface {
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>
getEvent (id: string): Promise<EventInterface | undefined>
getSyncStatus (): Promise<SyncStatusInterface | undefined>;
getBlock (blockHash: string): Promise<any>
getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any>
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>;
getBlockEvents (blockHash: string): Promise<Array<EventInterface>>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>