Address watcher block filler (#105)

* Address watcher block filler.

* Update docs.
This commit is contained in:
Ashwin Phatak 2021-06-28 17:12:53 +05:30 committed by GitHub
parent e9880693ad
commit a654b79df3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 169 additions and 11 deletions

View File

@ -21,6 +21,8 @@ sudo su - postgres
createdb erc20-watcher createdb erc20-watcher
``` ```
Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints.
Run the watcher: Run the watcher:
```bash ```bash

View File

@ -2,7 +2,14 @@
## Setup ## Setup
Enable the `pgcrypto` extension on the database (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro). Create a postgres12 database for the job queue:
```
sudo su - postgres
createdb job-queue
```
Enable the `pgcrypto` extension on the job queue database (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro).
Example: Example:
@ -17,3 +24,37 @@ job-queue=# CREATE EXTENSION pgcrypto;
CREATE EXTENSION CREATE EXTENSION
job-queue=# exit job-queue=# exit
``` ```
Create a postgres12 database for the address watcher:
```
sudo su - postgres
createdb address-watcher
```
Update `environments/local.toml` with database connection settings for both the databases.
Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API, the `indexer-db` postgraphile and the tracing API (`debug_traceTransaction` RPC provider) endpoints.
## Run
Run the following scripts in different terminals.
GQL server:
```
yarn server
```
Job runner for processing the tracing requests queue:
```
yarn job-runner
```
To fill a block range:
```
yarn fill --startBlock 1 --endBlock 1000
```

View File

@ -3,6 +3,7 @@ import { Connection, ConnectionOptions, createConnection, DeepPartial } from 'ty
import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import { Account } from './entity/Account'; import { Account } from './entity/Account';
import { BlockProgress } from './entity/BlockProgress';
import { Trace } from './entity/Trace'; import { Trace } from './entity/Trace';
export class Database { export class Database {
@ -93,4 +94,39 @@ export class Database {
.orderBy({ block_number: 'ASC' }) .orderBy({ block_number: 'ASC' })
.getMany(); .getMany();
} }
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
const repo = this._conn.getRepository(BlockProgress);
return repo.findOne({ where: { blockHash } });
}
async initBlockProgress (blockHash: string, blockNumber: number, numTx: number): Promise<void> {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(BlockProgress);
const numRows = await repo
.createQueryBuilder()
.where('block_hash = :blockHash', { blockHash })
.getCount();
if (numRows === 0) {
const entity = repo.create({ blockHash, blockNumber, numTx, numTracedTx: 0, isComplete: (numTx === 0) });
await repo.save(entity);
}
});
}
async updateBlockProgress (blockHash: string): Promise<void> {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(BlockProgress);
const entity = await repo.findOne({ where: { blockHash } });
if (entity && !entity.isComplete) {
entity.numTracedTx++;
if (entity.numTracedTx >= entity.numTx) {
entity.isComplete = true;
}
await repo.save(entity);
}
});
}
} }

View File

@ -0,0 +1,20 @@
import { Entity, PrimaryColumn, Column, Index } from 'typeorm';
@Entity()
@Index(['blockNumber'])
export class BlockProgress {
@PrimaryColumn('varchar', { length: 66 })
blockHash!: string;
@Column('numeric')
blockNumber!: number;
@Column('numeric')
numTx!: number;
@Column('numeric')
numTracedTx!: number;
@Column('boolean')
isComplete!: boolean
}

View File

@ -76,14 +76,21 @@ export const main = async (): Promise<any> => {
const { allEthHeaderCids: { nodes: blockNodes } } = result; const { allEthHeaderCids: { nodes: blockNodes } } = result;
for (let bi = 0; bi < blockNodes.length; bi++) { for (let bi = 0; bi < blockNodes.length; bi++) {
const { blockHash, ethTransactionCidsByHeaderId: { nodes: txNodes } } = blockNodes[bi]; const { blockHash, ethTransactionCidsByHeaderId: { nodes: txNodes } } = blockNodes[bi];
for (let ti = 0; ti < txNodes.length; ti++) { const blockProgress = await db.getBlockProgress(blockHash);
const { txHash } = txNodes[ti]; if (blockProgress) {
log(`Filling block number ${blockNumber}, block hash ${blockHash}, tx hash ${txHash}`); log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`);
} else {
await db.initBlockProgress(blockHash, blockNumber, txNodes.length);
// Never push appearances from fill jobs to GQL subscribers, as this command can be run multiple times for (let ti = 0; ti < txNodes.length; ti++) {
// for the same block range, and/or process the same block in multiple different runs spread over a const { txHash } = txNodes[ti];
// period of time. Also, the tx's are probably too old anyway for publishing. log(`Filling block number ${blockNumber}, block hash ${blockHash}, tx hash ${txHash}`);
await jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, publish: false });
// Never push appearances from fill jobs to GQL subscribers, as this command can be run multiple times
// for the same block range, and/or process the same block in multiple different runs spread over a
// period of time. Also, the tx's are probably too old anyway for publishing.
await jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, blockHash, publish: false, publishBlockProgress: true });
}
} }
} }
} }

View File

@ -10,6 +10,7 @@ import { addressesInTrace } from './util';
import { Database } from './database'; import { Database } from './database';
import { Trace } from './entity/Trace'; import { Trace } from './entity/Trace';
import { Account } from './entity/Account'; import { Account } from './entity/Account';
import { BlockProgress } from './entity/BlockProgress';
const log = debug('vulcanize:indexer'); const log = debug('vulcanize:indexer');
@ -50,9 +51,9 @@ export class Indexer {
async traceTxAndIndexAppearances (txHash: string): Promise<Trace> { async traceTxAndIndexAppearances (txHash: string): Promise<Trace> {
let entity = await this._db.getTrace(txHash); let entity = await this._db.getTrace(txHash);
if (entity) { if (entity) {
log('traceTx: db hit'); log(`traceTx: db hit ${txHash}`);
} else { } else {
log('traceTx: db miss, fetching from tracing API server'); log(`traceTx: db miss, fetching from tracing API server ${txHash}`);
const tx = await this._tracingClient.getTx(txHash); const tx = await this._tracingClient.getTx(txHash);
const trace = await this._tracingClient.getTxTrace(txHash, 'callTraceWithAddresses', '15s'); const trace = await this._tracingClient.getTxTrace(txHash, 'callTraceWithAddresses', '15s');
@ -77,6 +78,14 @@ export class Indexer {
return this._db.getAppearances(address, fromBlockNumber, toBlockNumber); return this._db.getAppearances(address, fromBlockNumber, toBlockNumber);
} }
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
return this._db.getBlockProgress(blockHash);
}
async updateBlockProgress (blockHash: string): Promise<void> {
return this._db.updateBlockProgress(blockHash);
}
async _indexAppearances (trace: Trace): Promise<Trace> { async _indexAppearances (trace: Trace): Promise<Trace> {
const traceObj = JSON.parse(trace.trace); const traceObj = JSON.parse(trace.trace);

View File

@ -28,6 +28,10 @@ export const createResolvers = async (indexer: Indexer, txWatcher: TxWatcher): P
return payload.onAddressEvent.address === ethers.utils.getAddress(variables.address); return payload.onAddressEvent.address === ethers.utils.getAddress(variables.address);
} }
) )
},
onBlockProgressEvent: {
subscribe: () => txWatcher.getBlockProgressEventIterator()
} }
}, },

View File

@ -16,6 +16,14 @@ type WatchedAddressEvent {
txTrace: TxTrace! txTrace: TxTrace!
} }
type BlockProgressEvent {
blockNumber: Int!
blockHash: String!
numTx: Int!
numTracedTx: Int!
isComplete: Boolean!
}
# #
# Queries # Queries
# #
@ -48,6 +56,9 @@ type Subscription {
# Watch for address events (at head of chain). # Watch for address events (at head of chain).
onAddressEvent(address: String!): WatchedAddressEvent! onAddressEvent(address: String!): WatchedAddressEvent!
# Watch for block progress events from filler process.
onBlockProgressEvent: BlockProgressEvent!
} }
# #

View File

@ -7,10 +7,12 @@ import { EthClient } from '@vulcanize/ipld-eth-client';
import { Indexer } from './indexer'; import { Indexer } from './indexer';
import { JobQueue } from './job-queue'; import { JobQueue } from './job-queue';
import { BlockProgress } from './entity/BlockProgress';
const log = debug('vulcanize:tx-watcher'); const log = debug('vulcanize:tx-watcher');
export const AddressEvent = 'address-event'; export const AddressEvent = 'address-event';
export const BlockProgressEvent = 'block-progress-event';
export const QUEUE_TX_TRACING = 'tx-tracing'; export const QUEUE_TX_TRACING = 'tx-tracing';
export class TxWatcher { export class TxWatcher {
@ -31,6 +33,10 @@ export class TxWatcher {
return this._pubsub.asyncIterator([AddressEvent]); return this._pubsub.asyncIterator([AddressEvent]);
} }
getBlockProgressEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([BlockProgressEvent]);
}
async start (): Promise<void> { async start (): Promise<void> {
assert(!this._watchTxSubscription, 'subscription already started'); assert(!this._watchTxSubscription, 'subscription already started');
@ -38,6 +44,13 @@ export class TxWatcher {
this._jobQueue.onComplete(QUEUE_TX_TRACING, async (job) => { this._jobQueue.onComplete(QUEUE_TX_TRACING, async (job) => {
const { data: { request, failed, state, createdOn } } = job; const { data: { request, failed, state, createdOn } } = job;
await this._indexer.updateBlockProgress(request.data.blockHash);
const blockProgress = await this._indexer.getBlockProgress(request.data.blockHash);
if (blockProgress && request.data.publishBlockProgress) {
await this.publishBlockProgressToSubscribers(blockProgress);
}
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
log(`Job onComplete tx ${request.data.txHash} publish ${!!request.data.publish}`); log(`Job onComplete tx ${request.data.txHash} publish ${!!request.data.publish}`);
if (!failed && state === 'completed' && request.data.publish) { if (!failed && state === 'completed' && request.data.publish) {
@ -53,7 +66,7 @@ export class TxWatcher {
this._watchTxSubscription = await this._ethClient.watchTransactions(async (value) => { this._watchTxSubscription = await this._ethClient.watchTransactions(async (value) => {
const { txHash, ethHeaderCidByHeaderId: { blockHash, blockNumber } } = _.get(value, 'data.listen.relatedNode'); const { txHash, ethHeaderCidByHeaderId: { blockHash, blockNumber } } = _.get(value, 'data.listen.relatedNode');
log('watchTransaction', JSON.stringify({ txHash, blockHash, blockNumber }, null, 2)); log('watchTransaction', JSON.stringify({ txHash, blockHash, blockNumber }, null, 2));
await this._jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, publish: true }); await this._jobQueue.pushJob(QUEUE_TX_TRACING, { txHash, blockHash, publish: true });
}); });
} }
@ -85,6 +98,21 @@ export class TxWatcher {
} }
} }
async publishBlockProgressToSubscribers (blockProgress: BlockProgress): Promise<void> {
const { blockHash, blockNumber, numTx, numTracedTx, isComplete } = blockProgress;
// Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`.
await this._pubsub.publish(BlockProgressEvent, {
onBlockProgressEvent: {
blockHash,
blockNumber,
numTx,
numTracedTx,
isComplete
}
});
}
async stop (): Promise<void> { async stop (): Promise<void> {
if (this._watchTxSubscription) { if (this._watchTxSubscription) {
log('Stopped watching upstream tx'); log('Stopped watching upstream tx');