mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-07-27 02:32:07 +00:00
Update code for log_cids change in vdb core (#243)
* Remove fields no longer present in schema. * Refactor code to work with log_cids change in eth-server. * Refactor process event to save events from logs. * Use constants for erc20 event names. * Implement watch blocks in erc20-watcher similar to uni-watcher. * Moved common methods to util. * Implement eventsInRange query in erc20-watcher. * Filter unknown event in database query. * Change dependencies version to be same in all packages. Co-authored-by: nabarun <nabarun@deepstacksoft.com>
This commit is contained in:
parent
0a96bbd94d
commit
4f8f1d8cd7
13
README.md
13
README.md
@ -46,11 +46,24 @@ createdb uni-info-watcher
|
||||
Create the databases for the job queues and enable the `pgcrypto` extension on them (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro):
|
||||
|
||||
```
|
||||
createdb erc20-watcher-job-queue
|
||||
createdb address-watcher-job-queue
|
||||
createdb uni-watcher-job-queue
|
||||
createdb uni-info-watcher-job-queue
|
||||
```
|
||||
|
||||
```
|
||||
postgres@tesla:~$ psql -U postgres -h localhost erc20-watcher-job-queue
|
||||
Password for user postgres:
|
||||
psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1))
|
||||
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
|
||||
Type "help" for help.
|
||||
|
||||
erc20-watcher-job-queue=# CREATE EXTENSION pgcrypto;
|
||||
CREATE EXTENSION
|
||||
erc20-watcher-job-queue=# exit
|
||||
```
|
||||
|
||||
```
|
||||
postgres@tesla:~$ psql -U postgres -h localhost address-watcher-job-queue
|
||||
Password for user postgres:
|
||||
|
@ -2,15 +2,38 @@
|
||||
|
||||
## Setup
|
||||
|
||||
Create a postgres12 database and provide connection settings in `environments/local.toml`.
|
||||
Create a postgres12 database for the job queue:
|
||||
|
||||
For example:
|
||||
```
|
||||
sudo su - postgres
|
||||
createdb erc20-watcher-job-queue
|
||||
```
|
||||
|
||||
Enable the `pgcrypto` extension on the job queue database (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro).
|
||||
|
||||
Example:
|
||||
|
||||
```
|
||||
postgres@tesla:~$ psql -U postgres -h localhost erc20-watcher-job-queue
|
||||
Password for user postgres:
|
||||
psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1))
|
||||
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
|
||||
Type "help" for help.
|
||||
|
||||
erc20-watcher-job-queue=# CREATE EXTENSION pgcrypto;
|
||||
CREATE EXTENSION
|
||||
erc20-watcher-job-queue=# exit
|
||||
```
|
||||
|
||||
Create a postgres12 database for the erc20 watcher:
|
||||
|
||||
```
|
||||
sudo su - postgres
|
||||
createdb erc20-watcher
|
||||
```
|
||||
|
||||
Update `environments/local.toml` with database connection settings for both the databases.
|
||||
|
||||
Update the `upstream` config in `environments/local.toml` and provide the `ipld-eth-server` GQL API and the `indexer-db` postgraphile endpoints.
|
||||
|
||||
## Run
|
||||
@ -24,17 +47,59 @@ yarn build
|
||||
Run the watcher:
|
||||
|
||||
```bash
|
||||
yarn run server
|
||||
$ yarn server
|
||||
|
||||
# For development.
|
||||
yarn run server:dev
|
||||
$ yarn server:dev
|
||||
|
||||
# For specifying config file.
|
||||
yarn run server -f environments/local.toml
|
||||
$ yarn server -f environments/local.toml
|
||||
```
|
||||
|
||||
Start the job runner:
|
||||
|
||||
```bash
|
||||
$ yarn job-runner
|
||||
|
||||
# For development.
|
||||
$ yarn job-runner:dev
|
||||
|
||||
# For specifying config file.
|
||||
$ yarn job-runner -f environments/local.toml
|
||||
```
|
||||
|
||||
GQL console: http://localhost:3001/graphql
|
||||
|
||||
Start watching a token:
|
||||
|
||||
```bash
|
||||
$ yarn watch:contract --address 0xTokenAddress --startingBlock <start-block>
|
||||
|
||||
# For specifying config file.
|
||||
$ yarn watch:contract -f environments/local.toml --address 0xTokenAddress --startingBlock <start-block>
|
||||
```
|
||||
|
||||
Example:
|
||||
|
||||
```bash
|
||||
$ yarn watch:contract --address 0xfE0034a874c2707c23F91D7409E9036F5e08ac34 --startingBlock 100
|
||||
```
|
||||
|
||||
To fill a block range:
|
||||
|
||||
```bash
|
||||
yarn fill --startBlock <from-block> --endBlock <to-block>
|
||||
|
||||
# For specifying config file.
|
||||
$ yarn fill -f environments/local.toml --startBlock <from-block> --endBlock <to-block>
|
||||
```
|
||||
|
||||
Example:
|
||||
|
||||
```bash
|
||||
$ yarn fill --startBlock 1000 --endBlock 2000
|
||||
```
|
||||
|
||||
### Example GQL Queries
|
||||
|
||||
```text
|
||||
|
@ -23,3 +23,8 @@
|
||||
name = "requests"
|
||||
enabled = false
|
||||
deleteOnStart = false
|
||||
|
||||
[jobQueue]
|
||||
dbConnectionString = "postgres://postgres:postgres@localhost/erc20-watcher-job-queue"
|
||||
maxCompletionLagInSecs = 300
|
||||
jobDelayInMilliSecs = 100
|
||||
|
@ -6,6 +6,8 @@ import '@nomiclabs/hardhat-waffle';
|
||||
|
||||
import './test/tasks/token-deploy';
|
||||
import './test/tasks/token-transfer';
|
||||
import './test/tasks/token-approve';
|
||||
import './test/tasks/token-transfer-from';
|
||||
import './test/tasks/block-latest';
|
||||
|
||||
// You need to export an object to set up your config
|
||||
|
@ -11,12 +11,20 @@
|
||||
"server": "DEBUG=vulcanize:* node --enable-source-maps dist/server.js",
|
||||
"server:dev": "DEBUG=vulcanize:* nodemon --watch src src/server.ts",
|
||||
"server:mock": "MOCK=1 nodemon src/server.ts",
|
||||
"job-runner": "DEBUG=vulcanize:* node --enable-source-maps dist/job-runner.js",
|
||||
"job-runner:dev": "DEBUG=vulcanize:* nodemon --watch src src/job-runner.ts",
|
||||
"watch:contract": "node --enable-source-maps dist/cli/watch-contract.js",
|
||||
"watch:contract:dev": "ts-node src/cli/watch-contract.ts",
|
||||
"fill": "DEBUG=vulcanize:* node dist/fill.js",
|
||||
"fill:dev": "DEBUG=vulcanize:* ts-node src/fill.ts",
|
||||
"token:deploy": "hardhat --network localhost token-deploy",
|
||||
"token:deploy:docker": "hardhat --network docker token-deploy",
|
||||
"token:transfer": "hardhat --network localhost token-transfer",
|
||||
"token:transfer:docker": "hardhat --network docker token-transfer",
|
||||
"token:approve": "hardhat --network localhost token-approve",
|
||||
"token:approve:docker": "hardhat --network docker token-approve",
|
||||
"token:transfer-from": "hardhat --network localhost token-transfer-from",
|
||||
"token:transfer-from:docker": "hardhat --network docker token-transfer-from",
|
||||
"block:latest": "hardhat --network localhost block-latest",
|
||||
"block:latest:docker": "hardhat --network docker block-latest"
|
||||
},
|
||||
|
@ -3,7 +3,7 @@
|
||||
//
|
||||
|
||||
import assert from 'assert';
|
||||
import { Connection, ConnectionOptions, DeepPartial } from 'typeorm';
|
||||
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner } from 'typeorm';
|
||||
import path from 'path';
|
||||
|
||||
import { Database as BaseDatabase } from '@vulcanize/util';
|
||||
@ -12,7 +12,10 @@ import { Allowance } from './entity/Allowance';
|
||||
import { Balance } from './entity/Balance';
|
||||
import { Contract } from './entity/Contract';
|
||||
import { Event } from './entity/Event';
|
||||
import { EventSyncProgress } from './entity/EventProgress';
|
||||
import { SyncStatus } from './entity/SyncStatus';
|
||||
import { BlockProgress } from './entity/BlockProgress';
|
||||
|
||||
const CONTRACT_KIND = 'token';
|
||||
|
||||
export class Database {
|
||||
_config: ConnectionOptions
|
||||
@ -73,87 +76,112 @@ export class Database {
|
||||
return repo.save(entity);
|
||||
}
|
||||
|
||||
// Returns true if events have already been synced for the (block, token) combination.
|
||||
async didSyncEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<boolean> {
|
||||
const numRows = await this._conn.getRepository(EventSyncProgress)
|
||||
.createQueryBuilder()
|
||||
.where('block_hash = :blockHash AND token = :token', {
|
||||
blockHash,
|
||||
token
|
||||
})
|
||||
.getCount();
|
||||
async getContract (address: string): Promise<Contract | undefined> {
|
||||
const repo = this._conn.getRepository(Contract);
|
||||
|
||||
return numRows > 0;
|
||||
return this._baseDatabase.getContract(repo, address);
|
||||
}
|
||||
|
||||
async getEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<Event[]> {
|
||||
return this._conn.getRepository(Event)
|
||||
.createQueryBuilder('event')
|
||||
.where('block_hash = :blockHash AND token = :token', {
|
||||
blockHash,
|
||||
token
|
||||
})
|
||||
.addOrderBy('id', 'ASC')
|
||||
.getMany();
|
||||
async createTransactionRunner (): Promise<QueryRunner> {
|
||||
return this._baseDatabase.createTransactionRunner();
|
||||
}
|
||||
|
||||
async getEventsByName ({ blockHash, token, eventName }: { blockHash: string, token: string, eventName: string }): Promise<Event[] | undefined> {
|
||||
return this._conn.getRepository(Event)
|
||||
.createQueryBuilder('event')
|
||||
.where('block_hash = :blockHash AND token = :token AND event_name = :eventName', {
|
||||
blockHash,
|
||||
token,
|
||||
eventName
|
||||
})
|
||||
.getMany();
|
||||
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
|
||||
const repo = this._conn.getRepository(BlockProgress);
|
||||
|
||||
return this._baseDatabase.getProcessedBlockCountForRange(repo, fromBlockNumber, toBlockNumber);
|
||||
}
|
||||
|
||||
async saveEvents ({ blockHash, token, events }: { blockHash: string, token: string, events: DeepPartial<Event>[] }): Promise<void> {
|
||||
// In a transaction:
|
||||
// (1) Save all the events in the database.
|
||||
// (2) Add an entry to the event progress table.
|
||||
async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<Event>> {
|
||||
const repo = this._conn.getRepository(Event);
|
||||
|
||||
await this._conn.transaction(async (tx) => {
|
||||
const repo = tx.getRepository(EventSyncProgress);
|
||||
|
||||
// Check sync progress inside the transaction.
|
||||
const numRows = await repo
|
||||
.createQueryBuilder()
|
||||
.where('block_hash = :blockHash AND token = :token', {
|
||||
blockHash,
|
||||
token
|
||||
})
|
||||
.getCount();
|
||||
|
||||
if (numRows === 0) {
|
||||
// Bulk insert events.
|
||||
await tx.createQueryBuilder()
|
||||
.insert()
|
||||
.into(Event)
|
||||
.values(events)
|
||||
.execute();
|
||||
|
||||
// Update event sync progress.
|
||||
const progress = repo.create({ blockHash, token });
|
||||
await repo.save(progress);
|
||||
}
|
||||
});
|
||||
return this._baseDatabase.getEventsInRange(repo, fromBlockNumber, toBlockNumber);
|
||||
}
|
||||
|
||||
async isWatchedContract (address: string): Promise<boolean> {
|
||||
const numRows = await this._conn.getRepository(Contract)
|
||||
.createQueryBuilder()
|
||||
.where('address = :address', { address })
|
||||
.getCount();
|
||||
async saveEventEntity (queryRunner: QueryRunner, entity: Event): Promise<Event> {
|
||||
const repo = queryRunner.manager.getRepository(Event);
|
||||
return this._baseDatabase.saveEventEntity(repo, entity);
|
||||
}
|
||||
|
||||
return numRows > 0;
|
||||
async getBlockEvents (blockHash: string, where: FindConditions<Event>): Promise<Event[]> {
|
||||
const repo = this._conn.getRepository(Event);
|
||||
|
||||
return this._baseDatabase.getBlockEvents(repo, blockHash, where);
|
||||
}
|
||||
|
||||
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
|
||||
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
|
||||
const eventRepo = queryRunner.manager.getRepository(Event);
|
||||
|
||||
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
|
||||
}
|
||||
|
||||
async saveContract (address: string, startingBlock: number): Promise<void> {
|
||||
await this._conn.transaction(async (tx) => {
|
||||
const repo = tx.getRepository(Contract);
|
||||
|
||||
return this._baseDatabase.saveContract(repo, address, startingBlock);
|
||||
return this._baseDatabase.saveContract(repo, address, startingBlock, CONTRACT_KIND);
|
||||
});
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
const repo = queryRunner.manager.getRepository(SyncStatus);
|
||||
|
||||
return this._baseDatabase.updateSyncStatusIndexedBlock(repo, blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
const repo = queryRunner.manager.getRepository(SyncStatus);
|
||||
|
||||
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
const repo = queryRunner.manager.getRepository(SyncStatus);
|
||||
|
||||
return this._baseDatabase.updateSyncStatusChainHead(repo, blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatus | undefined> {
|
||||
const repo = queryRunner.manager.getRepository(SyncStatus);
|
||||
|
||||
return this._baseDatabase.getSyncStatus(repo);
|
||||
}
|
||||
|
||||
async getEvent (id: string): Promise<Event | undefined> {
|
||||
const repo = this._conn.getRepository(Event);
|
||||
|
||||
return this._baseDatabase.getEvent(repo, id);
|
||||
}
|
||||
|
||||
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
|
||||
const repo = this._conn.getRepository(BlockProgress);
|
||||
|
||||
return this._baseDatabase.getBlocksAtHeight(repo, height, isPruned);
|
||||
}
|
||||
|
||||
async markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgress[]): Promise<void> {
|
||||
const repo = queryRunner.manager.getRepository(BlockProgress);
|
||||
|
||||
return this._baseDatabase.markBlocksAsPruned(repo, blocks);
|
||||
}
|
||||
|
||||
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
|
||||
const repo = this._conn.getRepository(BlockProgress);
|
||||
return this._baseDatabase.getBlockProgress(repo, blockHash);
|
||||
}
|
||||
|
||||
async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void> {
|
||||
const repo = queryRunner.manager.getRepository(BlockProgress);
|
||||
|
||||
return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex);
|
||||
}
|
||||
|
||||
async removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void> {
|
||||
return this._baseDatabase.removeEntities(queryRunner, entity, findConditions);
|
||||
}
|
||||
|
||||
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
|
||||
return this._baseDatabase.getAncestorAtDepth(blockHash, depth);
|
||||
}
|
||||
}
|
||||
|
43
packages/erc20-watcher/src/entity/BlockProgress.ts
Normal file
43
packages/erc20-watcher/src/entity/BlockProgress.ts
Normal file
@ -0,0 +1,43 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
|
||||
|
||||
import { BlockProgressInterface } from '@vulcanize/util';
|
||||
|
||||
@Entity()
|
||||
@Index(['blockHash'], { unique: true })
|
||||
@Index(['blockNumber'])
|
||||
@Index(['parentHash'])
|
||||
export class BlockProgress implements BlockProgressInterface {
|
||||
@PrimaryGeneratedColumn()
|
||||
id!: number;
|
||||
|
||||
@Column('varchar', { length: 66 })
|
||||
blockHash!: string;
|
||||
|
||||
@Column('varchar', { length: 66 })
|
||||
parentHash!: string;
|
||||
|
||||
@Column('integer')
|
||||
blockNumber!: number;
|
||||
|
||||
@Column('integer')
|
||||
blockTimestamp!: number;
|
||||
|
||||
@Column('integer')
|
||||
numEvents!: number;
|
||||
|
||||
@Column('integer')
|
||||
numProcessedEvents!: number;
|
||||
|
||||
@Column('integer')
|
||||
lastProcessedEventIndex!: number;
|
||||
|
||||
@Column('boolean')
|
||||
isComplete!: boolean
|
||||
|
||||
@Column('boolean', { default: false })
|
||||
isPruned!: boolean
|
||||
}
|
@ -13,6 +13,9 @@ export class Contract {
|
||||
@Column('varchar', { length: 42 })
|
||||
address!: string;
|
||||
|
||||
@Column('varchar', { length: 8 })
|
||||
kind!: string;
|
||||
|
||||
@Column('integer')
|
||||
startingBlock!: number;
|
||||
}
|
||||
|
@ -2,48 +2,42 @@
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
|
||||
import { Entity, PrimaryGeneratedColumn, Column, Index, ManyToOne } from 'typeorm';
|
||||
import { BlockProgress } from './BlockProgress';
|
||||
|
||||
export const UNKNOWN_EVENT_NAME = '__unknown__';
|
||||
|
||||
@Entity()
|
||||
// Index to query all events for a contract efficiently.
|
||||
@Index(['blockHash', 'token'])
|
||||
// Index to query 'Transfer' events efficiently.
|
||||
@Index(['blockHash', 'token', 'eventName', 'transferFrom', 'transferTo'])
|
||||
// Index to query 'Approval' events efficiently.
|
||||
@Index(['blockHash', 'token', 'eventName', 'approvalOwner', 'approvalSpender'])
|
||||
@Index(['block', 'contract'])
|
||||
// Index to query events by name efficiently.
|
||||
@Index(['block', 'contract', 'eventName'])
|
||||
export class Event {
|
||||
@PrimaryGeneratedColumn()
|
||||
id!: number;
|
||||
|
||||
@ManyToOne(() => BlockProgress)
|
||||
block!: BlockProgress;
|
||||
|
||||
@Column('varchar', { length: 66 })
|
||||
blockHash!: string;
|
||||
txHash!: string;
|
||||
|
||||
// Index of the log in the block.
|
||||
@Column('integer')
|
||||
index!: number;
|
||||
|
||||
@Column('varchar', { length: 42 })
|
||||
token!: string;
|
||||
contract!: string;
|
||||
|
||||
@Column('varchar', { length: 256 })
|
||||
eventName!: string;
|
||||
|
||||
@Column('text')
|
||||
eventInfo!: string;
|
||||
|
||||
@Column('text')
|
||||
extraInfo!: string;
|
||||
|
||||
@Column('text')
|
||||
proof!: string;
|
||||
|
||||
// Transfer event columns.
|
||||
@Column('varchar', { length: 42, nullable: true })
|
||||
transferFrom!: string;
|
||||
|
||||
@Column('varchar', { length: 42, nullable: true })
|
||||
transferTo!: string;
|
||||
|
||||
@Column('numeric', { nullable: true })
|
||||
transferValue!: bigint;
|
||||
|
||||
// Approval event columns.
|
||||
@Column('varchar', { length: 42, nullable: true })
|
||||
approvalOwner!: string;
|
||||
|
||||
@Column('varchar', { length: 42, nullable: true })
|
||||
approvalSpender!: string;
|
||||
|
||||
@Column('numeric', { nullable: true })
|
||||
approvalValue!: bigint;
|
||||
}
|
||||
|
@ -1,24 +0,0 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
|
||||
|
||||
// Stores a row if events for a (block, token) combination have already been fetched.
|
||||
//
|
||||
// Required as a particular block may not have events from a particular contract,
|
||||
// and we need to differentiate between that case and the case where data hasn't
|
||||
// yet been synced from upstream.
|
||||
//
|
||||
@Entity()
|
||||
@Index(['blockHash', 'token'], { unique: true })
|
||||
export class EventSyncProgress {
|
||||
@PrimaryGeneratedColumn()
|
||||
id!: number;
|
||||
|
||||
@Column('varchar', { length: 66 })
|
||||
blockHash!: string;
|
||||
|
||||
@Column('varchar', { length: 42 })
|
||||
token!: string;
|
||||
}
|
37
packages/erc20-watcher/src/entity/SyncStatus.ts
Normal file
37
packages/erc20-watcher/src/entity/SyncStatus.ts
Normal file
@ -0,0 +1,37 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';
|
||||
|
||||
import { SyncStatusInterface } from '@vulcanize/util';
|
||||
|
||||
@Entity()
|
||||
export class SyncStatus implements SyncStatusInterface {
|
||||
@PrimaryGeneratedColumn()
|
||||
id!: number;
|
||||
|
||||
// Latest block hash and number from the chain itself.
|
||||
@Column('varchar', { length: 66 })
|
||||
chainHeadBlockHash!: string;
|
||||
|
||||
@Column('integer')
|
||||
chainHeadBlockNumber!: number;
|
||||
|
||||
// Most recent block hash that's been indexed.
|
||||
@Column('varchar', { length: 66 })
|
||||
latestIndexedBlockHash!: string;
|
||||
|
||||
// Most recent block number that's been indexed.
|
||||
@Column('integer')
|
||||
latestIndexedBlockNumber!: number;
|
||||
|
||||
// Most recent block hash and number that we can consider as part
|
||||
// of the canonical/finalized chain. Reorgs older than this block
|
||||
// cannot be processed and processing will halt.
|
||||
@Column('varchar', { length: 66 })
|
||||
latestCanonicalBlockHash!: string;
|
||||
|
||||
@Column('integer')
|
||||
latestCanonicalBlockNumber!: number;
|
||||
}
|
@ -4,11 +4,20 @@
|
||||
|
||||
import assert from 'assert';
|
||||
import debug from 'debug';
|
||||
import _ from 'lodash';
|
||||
import { PubSub } from 'apollo-server-express';
|
||||
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import {
|
||||
JobQueue,
|
||||
EventWatcher as BaseEventWatcher,
|
||||
QUEUE_BLOCK_PROCESSING,
|
||||
QUEUE_EVENT_PROCESSING
|
||||
} from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
|
||||
|
||||
const EVENT = 'event';
|
||||
|
||||
const log = debug('vulcanize:events');
|
||||
|
||||
@ -16,48 +25,88 @@ export class EventWatcher {
|
||||
_ethClient: EthClient
|
||||
_indexer: Indexer
|
||||
_subscription: ZenObservable.Subscription | undefined
|
||||
_baseEventWatcher: BaseEventWatcher
|
||||
_pubsub: PubSub
|
||||
_jobQueue: JobQueue
|
||||
|
||||
constructor (ethClient: EthClient, indexer: Indexer) {
|
||||
constructor (ethClient: EthClient, indexer: Indexer, pubsub: PubSub, jobQueue: JobQueue) {
|
||||
assert(ethClient);
|
||||
assert(indexer);
|
||||
|
||||
this._ethClient = ethClient;
|
||||
this._indexer = indexer;
|
||||
this._pubsub = pubsub;
|
||||
this._jobQueue = jobQueue;
|
||||
this._baseEventWatcher = new BaseEventWatcher(this._ethClient, this._indexer, this._pubsub, this._jobQueue);
|
||||
}
|
||||
|
||||
getEventIterator (): AsyncIterator<any> {
|
||||
return this._pubsub.asyncIterator([EVENT]);
|
||||
}
|
||||
|
||||
getBlockProgressEventIterator (): AsyncIterator<any> {
|
||||
return this._baseEventWatcher.getBlockProgressEventIterator();
|
||||
}
|
||||
|
||||
async start (): Promise<void> {
|
||||
assert(!this._subscription, 'subscription already started');
|
||||
|
||||
log('Started watching upstream logs...');
|
||||
|
||||
this._subscription = await this._ethClient.watchLogs(async (value) => {
|
||||
const receipt = _.get(value, 'data.listen.relatedNode');
|
||||
log('watchLogs', JSON.stringify(receipt, null, 2));
|
||||
|
||||
// Check if this log is for a contract we care about.
|
||||
const { logContracts } = receipt;
|
||||
if (logContracts && logContracts.length) {
|
||||
for (let logIndex = 0; logIndex < logContracts.length; logIndex++) {
|
||||
const contractAddress = logContracts[logIndex];
|
||||
const isWatchedContract = await this._indexer.isWatchedContract(contractAddress);
|
||||
if (isWatchedContract) {
|
||||
// TODO: Move processing to background task runner.
|
||||
|
||||
const { ethTransactionCidByTxId: { ethHeaderCidByHeaderId: { blockHash, blockNumber } } } = receipt;
|
||||
await this._indexer.getEvents(blockHash, contractAddress, null);
|
||||
|
||||
// Trigger other indexer methods based on event topic.
|
||||
await this._indexer.processEvent(blockHash, blockNumber, contractAddress, receipt, logIndex);
|
||||
await this.watchBlocksAtChainHead();
|
||||
await this.initBlockProcessingOnCompleteHandler();
|
||||
await this.initEventProcessingOnCompleteHandler();
|
||||
}
|
||||
|
||||
async stop (): Promise<void> {
|
||||
this._baseEventWatcher.stop();
|
||||
}
|
||||
|
||||
async watchBlocksAtChainHead (): Promise<void> {
|
||||
log('Started watching upstream blocks...');
|
||||
this._subscription = await this._ethClient.watchBlocks(async (value) => {
|
||||
await this._baseEventWatcher.blocksHandler(value);
|
||||
});
|
||||
}
|
||||
|
||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||
});
|
||||
}
|
||||
|
||||
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
||||
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||
|
||||
const { data: { request, failed, state, createdOn } } = job;
|
||||
|
||||
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
||||
log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`);
|
||||
if (!failed && state === 'completed' && request.data.publish) {
|
||||
// Check for max acceptable lag time between request and sending results to live subscribers.
|
||||
if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) {
|
||||
await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds);
|
||||
} else {
|
||||
log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async stop (): Promise<void> {
|
||||
if (this._subscription) {
|
||||
log('Stopped watching upstream logs');
|
||||
this._subscription.unsubscribe();
|
||||
async publishEventToSubscribers (dbEvent: Event, timeElapsedInSeconds: number): Promise<void> {
|
||||
if (dbEvent && dbEvent.eventName !== UNKNOWN_EVENT_NAME) {
|
||||
const { block: { blockHash }, contract: token } = dbEvent;
|
||||
const resultEvent = this._indexer.getResultEvent(dbEvent);
|
||||
|
||||
log(`pushing event to GQL subscribers (${timeElapsedInSeconds}s elapsed): ${resultEvent.event.__typename}`);
|
||||
|
||||
// Publishing the event here will result in pushing the payload to GQL subscribers for `onEvent`.
|
||||
await this._pubsub.publish(EVENT, {
|
||||
onTokenEvent: {
|
||||
blockHash,
|
||||
token,
|
||||
event: resultEvent
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
95
packages/erc20-watcher/src/fill.ts
Normal file
95
packages/erc20-watcher/src/fill.ts
Normal file
@ -0,0 +1,95 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import assert from 'assert';
|
||||
import 'reflect-metadata';
|
||||
import yargs from 'yargs';
|
||||
import { hideBin } from 'yargs/helpers';
|
||||
import debug from 'debug';
|
||||
import { PubSub } from 'apollo-server-express';
|
||||
import { getDefaultProvider } from 'ethers';
|
||||
|
||||
import { getCache } from '@vulcanize/cache';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH } from '@vulcanize/util';
|
||||
|
||||
import { Database } from './database';
|
||||
import { Indexer } from './indexer';
|
||||
import { EventWatcher } from './events';
|
||||
|
||||
const log = debug('vulcanize:server');
|
||||
|
||||
export const main = async (): Promise<any> => {
|
||||
const argv = await yargs(hideBin(process.argv)).parserConfiguration({
|
||||
'parse-numbers': false
|
||||
}).options({
|
||||
configFile: {
|
||||
alias: 'f',
|
||||
type: 'string',
|
||||
require: true,
|
||||
demandOption: true,
|
||||
describe: 'configuration file path (toml)',
|
||||
default: DEFAULT_CONFIG_PATH
|
||||
},
|
||||
startBlock: {
|
||||
type: 'number',
|
||||
require: true,
|
||||
demandOption: true,
|
||||
describe: 'Block number to start processing at'
|
||||
},
|
||||
endBlock: {
|
||||
type: 'number',
|
||||
require: true,
|
||||
demandOption: true,
|
||||
describe: 'Block number to stop processing at'
|
||||
}
|
||||
}).argv;
|
||||
|
||||
const config = await getConfig(argv.configFile);
|
||||
|
||||
assert(config.server, 'Missing server config');
|
||||
|
||||
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: { mode } } = config;
|
||||
|
||||
assert(dbConfig, 'Missing database config');
|
||||
|
||||
const db = new Database(dbConfig);
|
||||
await db.init();
|
||||
|
||||
assert(upstream, 'Missing upstream config');
|
||||
const { ethServer: { gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
|
||||
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
|
||||
|
||||
const cache = await getCache(cacheConfig);
|
||||
const ethClient = new EthClient({
|
||||
gqlEndpoint: gqlPostgraphileEndpoint,
|
||||
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
|
||||
cache
|
||||
});
|
||||
|
||||
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
|
||||
|
||||
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
|
||||
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
||||
const pubsub = new PubSub();
|
||||
const indexer = new Indexer(db, ethClient, ethProvider, mode);
|
||||
|
||||
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
|
||||
assert(dbConnectionString, 'Missing job queue db connection string');
|
||||
|
||||
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
|
||||
await jobQueue.start();
|
||||
|
||||
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
|
||||
|
||||
assert(jobQueueConfig, 'Missing job queue config');
|
||||
|
||||
await fillBlocks(jobQueue, indexer, ethClient, eventWatcher, argv);
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
process.exit();
|
||||
}).catch(err => {
|
||||
log(err);
|
||||
});
|
@ -4,38 +4,32 @@
|
||||
|
||||
import assert from 'assert';
|
||||
import debug from 'debug';
|
||||
import { invert } from 'lodash';
|
||||
import { JsonFragment } from '@ethersproject/abi';
|
||||
import { DeepPartial } from 'typeorm';
|
||||
import JSONbig from 'json-bigint';
|
||||
import { BigNumber, ethers } from 'ethers';
|
||||
import { BaseProvider } from '@ethersproject/providers';
|
||||
import { PubSub } from 'apollo-server-express';
|
||||
|
||||
import { EthClient, topictoAddress } from '@vulcanize/ipld-eth-client';
|
||||
import { getEventNameTopics, getStorageValue, GetStorageAt, StorageLayout } from '@vulcanize/solidity-mapper';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import { StorageLayout } from '@vulcanize/solidity-mapper';
|
||||
import { EventInterface, Indexer as BaseIndexer, ValueResult } from '@vulcanize/util';
|
||||
|
||||
import { Database } from './database';
|
||||
import { Event } from './entity/Event';
|
||||
import { Event, UNKNOWN_EVENT_NAME } from './entity/Event';
|
||||
import { fetchTokenDecimals, fetchTokenName, fetchTokenSymbol, fetchTokenTotalSupply } from './utils';
|
||||
import { SyncStatus } from './entity/SyncStatus';
|
||||
import artifacts from './artifacts/ERC20.json';
|
||||
import { BlockProgress } from './entity/BlockProgress';
|
||||
import { Contract } from './entity/Contract';
|
||||
|
||||
const log = debug('vulcanize:indexer');
|
||||
|
||||
const ETH_CALL_MODE = 'eth_call';
|
||||
|
||||
interface Artifacts {
|
||||
abi: JsonFragment[];
|
||||
storageLayout: StorageLayout;
|
||||
}
|
||||
const TRANSFER_EVENT = 'Transfer';
|
||||
const APPROVAL_EVENT = 'Approval';
|
||||
|
||||
export interface ValueResult {
|
||||
value: string | bigint;
|
||||
proof?: {
|
||||
data: string;
|
||||
}
|
||||
}
|
||||
|
||||
type EventsResult = Array<{
|
||||
interface EventResult {
|
||||
event: {
|
||||
from?: string;
|
||||
to?: string;
|
||||
@ -45,46 +39,51 @@ type EventsResult = Array<{
|
||||
__typename: string;
|
||||
}
|
||||
proof?: string;
|
||||
}>
|
||||
}
|
||||
|
||||
export class Indexer {
|
||||
_db: Database
|
||||
_ethClient: EthClient
|
||||
_pubsub: PubSub
|
||||
_getStorageAt: GetStorageAt
|
||||
_ethProvider: BaseProvider
|
||||
_baseIndexer: BaseIndexer
|
||||
|
||||
_abi: JsonFragment[]
|
||||
_storageLayout: StorageLayout
|
||||
_contract: ethers.utils.Interface
|
||||
_serverMode: string
|
||||
|
||||
constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider, pubsub: PubSub, artifacts: Artifacts, serverMode: string) {
|
||||
constructor (db: Database, ethClient: EthClient, ethProvider: BaseProvider, serverMode: string) {
|
||||
assert(db);
|
||||
assert(ethClient);
|
||||
assert(pubsub);
|
||||
assert(artifacts);
|
||||
|
||||
this._db = db;
|
||||
this._ethClient = ethClient;
|
||||
this._ethProvider = ethProvider;
|
||||
this._serverMode = serverMode;
|
||||
this._baseIndexer = new BaseIndexer(this._db, this._ethClient);
|
||||
|
||||
const { abi, storageLayout } = artifacts;
|
||||
|
||||
assert(abi);
|
||||
assert(storageLayout);
|
||||
|
||||
this._db = db;
|
||||
this._ethClient = ethClient;
|
||||
this._ethProvider = ethProvider;
|
||||
this._pubsub = pubsub;
|
||||
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
|
||||
this._serverMode = serverMode;
|
||||
|
||||
this._abi = abi;
|
||||
this._storageLayout = storageLayout;
|
||||
|
||||
this._contract = new ethers.utils.Interface(this._abi);
|
||||
}
|
||||
|
||||
getEventIterator (): AsyncIterator<any> {
|
||||
return this._pubsub.asyncIterator(['event']);
|
||||
getResultEvent (event: Event): EventResult {
|
||||
const eventFields = JSON.parse(event.eventInfo);
|
||||
|
||||
return {
|
||||
event: {
|
||||
__typename: `${event.eventName}Event`,
|
||||
...eventFields
|
||||
},
|
||||
// TODO: Return proof only if requested.
|
||||
proof: JSON.parse(event.proof)
|
||||
};
|
||||
}
|
||||
|
||||
async totalSupply (blockHash: string, token: string): Promise<ValueResult> {
|
||||
@ -95,7 +94,7 @@ export class Indexer {
|
||||
|
||||
result = { value };
|
||||
} else {
|
||||
result = await this._getStorageValue(blockHash, token, '_totalSupply');
|
||||
result = await this._baseIndexer.getStorageValue(this._storageLayout, blockHash, token, '_totalSupply');
|
||||
}
|
||||
|
||||
// https://github.com/GoogleChromeLabs/jsbi/issues/30#issuecomment-521460510
|
||||
@ -130,7 +129,7 @@ export class Indexer {
|
||||
value: BigInt(value.toString())
|
||||
};
|
||||
} else {
|
||||
result = await this._getStorageValue(blockHash, token, '_balances', owner);
|
||||
result = await this._baseIndexer.getStorageValue(this._storageLayout, blockHash, token, '_balances', owner);
|
||||
}
|
||||
|
||||
log(JSONbig.stringify(result, null, 2));
|
||||
@ -165,7 +164,7 @@ export class Indexer {
|
||||
value: BigInt(value.toString())
|
||||
};
|
||||
} else {
|
||||
result = await this._getStorageValue(blockHash, token, '_allowances', owner, spender);
|
||||
result = await this._baseIndexer.getStorageValue(this._storageLayout, blockHash, token, '_allowances', owner, spender);
|
||||
}
|
||||
|
||||
// log(JSONbig.stringify(result, null, 2));
|
||||
@ -184,7 +183,7 @@ export class Indexer {
|
||||
|
||||
result = { value };
|
||||
} else {
|
||||
result = await this._getStorageValue(blockHash, token, '_name');
|
||||
result = await this._baseIndexer.getStorageValue(this._storageLayout, blockHash, token, '_name');
|
||||
}
|
||||
|
||||
// log(JSONbig.stringify(result, null, 2));
|
||||
@ -200,7 +199,7 @@ export class Indexer {
|
||||
|
||||
result = { value };
|
||||
} else {
|
||||
result = await this._getStorageValue(blockHash, token, '_symbol');
|
||||
result = await this._baseIndexer.getStorageValue(this._storageLayout, blockHash, token, '_symbol');
|
||||
}
|
||||
|
||||
// log(JSONbig.stringify(result, null, 2));
|
||||
@ -224,88 +223,24 @@ export class Indexer {
|
||||
return result;
|
||||
}
|
||||
|
||||
async getEvents (blockHash: string, token: string, name: string | null): Promise<EventsResult> {
|
||||
const didSyncEvents = await this._db.didSyncEvents({ blockHash, token });
|
||||
if (!didSyncEvents) {
|
||||
// Fetch and save events first and make a note in the event sync progress table.
|
||||
await this._fetchAndSaveEvents({ blockHash, token });
|
||||
log('getEvents: db miss, fetching from upstream server');
|
||||
}
|
||||
|
||||
assert(await this._db.didSyncEvents({ blockHash, token }));
|
||||
|
||||
const events = await this._db.getEvents({ blockHash, token });
|
||||
log('getEvents: db hit');
|
||||
|
||||
const result = events
|
||||
// TODO: Filter using db WHERE condition when name is not empty.
|
||||
.filter(event => !name || name === event.eventName)
|
||||
.map(e => {
|
||||
const eventFields: {
|
||||
from?: string,
|
||||
to?: string,
|
||||
value?: BigInt,
|
||||
owner?: string,
|
||||
spender?: string,
|
||||
} = {};
|
||||
|
||||
switch (e.eventName) {
|
||||
case 'Transfer': {
|
||||
eventFields.from = e.transferFrom;
|
||||
eventFields.to = e.transferTo;
|
||||
eventFields.value = e.transferValue;
|
||||
break;
|
||||
}
|
||||
case 'Approval': {
|
||||
eventFields.owner = e.approvalOwner;
|
||||
eventFields.spender = e.approvalSpender;
|
||||
eventFields.value = e.approvalValue;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
event: {
|
||||
__typename: `${e.eventName}Event`,
|
||||
...eventFields
|
||||
},
|
||||
// TODO: Return proof only if requested.
|
||||
proof: JSON.parse(e.proof)
|
||||
};
|
||||
});
|
||||
|
||||
// log(JSONbig.stringify(result, null, 2));
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
async triggerIndexingOnEvent (blockHash: string, blockNumber: number, token: string, receipt: any, logIndex: number): Promise<void> {
|
||||
const topics = [];
|
||||
|
||||
// We only care about the event type for now.
|
||||
const data = '0x0000000000000000000000000000000000000000000000000000000000000000';
|
||||
|
||||
topics.push(receipt.topic0S[logIndex]);
|
||||
topics.push(receipt.topic1S[logIndex]);
|
||||
topics.push(receipt.topic2S[logIndex]);
|
||||
|
||||
const { name: eventName, args } = this._contract.parseLog({ topics, data });
|
||||
log(`trigger indexing on event: ${eventName} ${args}`);
|
||||
async triggerIndexingOnEvent (event: Event): Promise<void> {
|
||||
const { eventName, eventInfo, contract: token, block: { blockHash } } = event;
|
||||
const eventFields = JSON.parse(eventInfo);
|
||||
|
||||
// What data we index depends on the kind of event.
|
||||
switch (eventName) {
|
||||
case 'Transfer': {
|
||||
case TRANSFER_EVENT: {
|
||||
// On a transfer, balances for both parties change.
|
||||
// Therefore, trigger indexing for both sender and receiver.
|
||||
const [from, to] = args;
|
||||
const { from, to } = eventFields;
|
||||
await this.balanceOf(blockHash, token, from);
|
||||
await this.balanceOf(blockHash, token, to);
|
||||
|
||||
break;
|
||||
}
|
||||
case 'Approval': {
|
||||
case APPROVAL_EVENT: {
|
||||
// Update allowance for (owner, spender) combination.
|
||||
const [owner, spender] = args;
|
||||
const { owner, spender } = eventFields;
|
||||
await this.allowance(blockHash, token, owner, spender);
|
||||
|
||||
break;
|
||||
@ -313,35 +248,44 @@ export class Indexer {
|
||||
}
|
||||
}
|
||||
|
||||
async publishEventToSubscribers (blockHash: string, token: string, logIndex: number): Promise<void> {
|
||||
// TODO: Optimize this fetching of events.
|
||||
const events = await this.getEvents(blockHash, token, null);
|
||||
const event = events[logIndex];
|
||||
|
||||
log(`pushing event to GQL subscribers: ${event.event.__typename}`);
|
||||
|
||||
// Publishing the event here will result in pushing the payload to GQL subscribers for `onTokenEvent`.
|
||||
await this._pubsub.publish('event', {
|
||||
onTokenEvent: {
|
||||
blockHash,
|
||||
token,
|
||||
event
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async processEvent (blockHash: string, blockNumber: number, token: string, receipt: any, logIndex: number): Promise<void> {
|
||||
async processEvent (event: Event): Promise<void> {
|
||||
// Trigger indexing of data based on the event.
|
||||
await this.triggerIndexingOnEvent(blockHash, blockNumber, token, receipt, logIndex);
|
||||
|
||||
// Also trigger downstream event watcher subscriptions.
|
||||
await this.publishEventToSubscribers(blockHash, token, logIndex);
|
||||
await this.triggerIndexingOnEvent(event);
|
||||
}
|
||||
|
||||
async isWatchedContract (address : string): Promise<boolean> {
|
||||
assert(address);
|
||||
parseEventNameAndArgs (kind: string, logObj: any): any {
|
||||
let eventName = UNKNOWN_EVENT_NAME;
|
||||
let eventInfo = {};
|
||||
|
||||
return this._db.isWatchedContract(ethers.utils.getAddress(address));
|
||||
const { topics, data } = logObj;
|
||||
const logDescription = this._contract.parseLog({ data, topics });
|
||||
|
||||
switch (logDescription.name) {
|
||||
case TRANSFER_EVENT: {
|
||||
eventName = logDescription.name;
|
||||
const [from, to, value] = logDescription.args;
|
||||
eventInfo = {
|
||||
from,
|
||||
to,
|
||||
value: value.toString()
|
||||
};
|
||||
|
||||
break;
|
||||
}
|
||||
case APPROVAL_EVENT: {
|
||||
eventName = logDescription.name;
|
||||
const [owner, spender, value] = logDescription.args;
|
||||
eventInfo = {
|
||||
owner,
|
||||
spender,
|
||||
value: value.toString()
|
||||
};
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return { eventName, eventInfo };
|
||||
}
|
||||
|
||||
async watchContract (address: string, startingBlock: number): Promise<boolean> {
|
||||
@ -351,67 +295,156 @@ export class Indexer {
|
||||
return true;
|
||||
}
|
||||
|
||||
// TODO: Move into base/class or framework package.
|
||||
async _getStorageValue (blockHash: string, token: string, variable: string, ...mappingKeys: string[]): Promise<ValueResult> {
|
||||
return getStorageValue(
|
||||
this._storageLayout,
|
||||
this._getStorageAt,
|
||||
blockHash,
|
||||
token,
|
||||
variable,
|
||||
...mappingKeys
|
||||
);
|
||||
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
|
||||
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
|
||||
}
|
||||
|
||||
async _fetchAndSaveEvents ({ blockHash, token }: { blockHash: string, token: string }): Promise<void> {
|
||||
const { logs } = await this._ethClient.getLogs({ blockHash, contract: token });
|
||||
async isWatchedContract (address : string): Promise<Contract | undefined> {
|
||||
return this._baseIndexer.isWatchedContract(address);
|
||||
}
|
||||
|
||||
const eventNameToTopic = getEventNameTopics(this._abi);
|
||||
const logTopicToEventName = invert(eventNameToTopic);
|
||||
async saveEventEntity (dbEvent: Event): Promise<Event> {
|
||||
return this._baseIndexer.saveEventEntity(dbEvent);
|
||||
}
|
||||
|
||||
const dbEvents = logs.map((log: any) => {
|
||||
const { topics, data: value, cid, ipldBlock } = log;
|
||||
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
|
||||
return this._baseIndexer.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber);
|
||||
}
|
||||
|
||||
const [topic0, topic1, topic2] = topics;
|
||||
async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<Event>> {
|
||||
return this._baseIndexer.getEventsInRange(fromBlockNumber, toBlockNumber);
|
||||
}
|
||||
|
||||
const eventName = logTopicToEventName[topic0];
|
||||
const address1 = topictoAddress(topic1);
|
||||
const address2 = topictoAddress(topic2);
|
||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
const event: DeepPartial<Event> = {
|
||||
blockHash,
|
||||
token,
|
||||
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async getSyncStatus (): Promise<SyncStatus | undefined> {
|
||||
return this._baseIndexer.getSyncStatus();
|
||||
}
|
||||
|
||||
async getBlock (blockHash: string): Promise<any> {
|
||||
return this._baseIndexer.getBlock(blockHash);
|
||||
}
|
||||
|
||||
async getEvent (id: string): Promise<Event | undefined> {
|
||||
return this._baseIndexer.getEvent(id);
|
||||
}
|
||||
|
||||
async getBlockProgress (blockHash: string): Promise<BlockProgress | undefined> {
|
||||
return this._baseIndexer.getBlockProgress(blockHash);
|
||||
}
|
||||
|
||||
async getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgress[]> {
|
||||
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
|
||||
}
|
||||
|
||||
async getOrFetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<Array<EventInterface>> {
|
||||
return this._baseIndexer.getOrFetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
|
||||
}
|
||||
|
||||
async getBlockEvents (blockHash: string): Promise<Array<Event>> {
|
||||
return this._baseIndexer.getBlockEvents(blockHash);
|
||||
}
|
||||
|
||||
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
|
||||
return this._baseIndexer.markBlocksAsPruned(blocks);
|
||||
}
|
||||
|
||||
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> {
|
||||
return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex);
|
||||
}
|
||||
|
||||
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
|
||||
return this._baseIndexer.getAncestorAtDepth(blockHash, depth);
|
||||
}
|
||||
|
||||
async _fetchAndSaveEvents ({ blockHash }: DeepPartial<BlockProgress>): Promise<void> {
|
||||
assert(blockHash);
|
||||
let { block, logs } = await this._ethClient.getLogs({ blockHash });
|
||||
|
||||
const dbEvents: Array<DeepPartial<Event>> = [];
|
||||
|
||||
for (let li = 0; li < logs.length; li++) {
|
||||
const logObj = logs[li];
|
||||
const {
|
||||
topics,
|
||||
data,
|
||||
index: logIndex,
|
||||
cid,
|
||||
ipldBlock,
|
||||
account: {
|
||||
address
|
||||
},
|
||||
transaction: {
|
||||
hash: txHash
|
||||
},
|
||||
receiptCID,
|
||||
status
|
||||
} = logObj;
|
||||
|
||||
if (status) {
|
||||
let eventName = UNKNOWN_EVENT_NAME;
|
||||
let eventInfo = {};
|
||||
const extraInfo = { topics, data };
|
||||
|
||||
const contract = ethers.utils.getAddress(address);
|
||||
const watchedContract = await this.isWatchedContract(contract);
|
||||
|
||||
if (watchedContract) {
|
||||
const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj);
|
||||
eventName = eventDetails.eventName;
|
||||
eventInfo = eventDetails.eventInfo;
|
||||
}
|
||||
|
||||
dbEvents.push({
|
||||
index: logIndex,
|
||||
txHash,
|
||||
contract,
|
||||
eventName,
|
||||
|
||||
eventInfo: JSONbig.stringify(eventInfo),
|
||||
extraInfo: JSONbig.stringify(extraInfo),
|
||||
proof: JSONbig.stringify({
|
||||
data: JSONbig.stringify({
|
||||
blockHash,
|
||||
receipt: {
|
||||
receiptCID,
|
||||
log: {
|
||||
cid,
|
||||
ipldBlock
|
||||
}
|
||||
})
|
||||
})
|
||||
});
|
||||
} else {
|
||||
log(`Skipping event for receipt ${receiptCID} due to failed transaction.`);
|
||||
}
|
||||
}
|
||||
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
|
||||
try {
|
||||
block = {
|
||||
blockHash,
|
||||
blockNumber: block.number,
|
||||
blockTimestamp: block.timestamp,
|
||||
parentHash: block.parent.hash
|
||||
};
|
||||
|
||||
switch (eventName) {
|
||||
case 'Transfer': {
|
||||
event.transferFrom = address1;
|
||||
event.transferTo = address2;
|
||||
event.transferValue = BigInt(value);
|
||||
break;
|
||||
}
|
||||
case 'Approval': {
|
||||
event.approvalOwner = address1;
|
||||
event.approvalSpender = address2;
|
||||
event.approvalValue = BigInt(value);
|
||||
break;
|
||||
await this._db.saveEvents(dbTx, block, dbEvents);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
}
|
||||
|
||||
return event;
|
||||
});
|
||||
|
||||
await this._db.saveEvents({ blockHash, token, events: dbEvents });
|
||||
}
|
||||
}
|
||||
|
126
packages/erc20-watcher/src/job-runner.ts
Normal file
126
packages/erc20-watcher/src/job-runner.ts
Normal file
@ -0,0 +1,126 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import assert from 'assert';
|
||||
import 'reflect-metadata';
|
||||
import yargs from 'yargs';
|
||||
import { hideBin } from 'yargs/helpers';
|
||||
import debug from 'debug';
|
||||
import { getDefaultProvider } from 'ethers';
|
||||
|
||||
import { getCache } from '@vulcanize/cache';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import {
|
||||
getConfig,
|
||||
JobQueue,
|
||||
JobRunner as BaseJobRunner,
|
||||
QUEUE_BLOCK_PROCESSING,
|
||||
QUEUE_EVENT_PROCESSING,
|
||||
JobQueueConfig,
|
||||
DEFAULT_CONFIG_PATH
|
||||
} from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
import { Database } from './database';
|
||||
|
||||
const log = debug('vulcanize:job-runner');
|
||||
|
||||
export class JobRunner {
|
||||
_indexer: Indexer
|
||||
_jobQueue: JobQueue
|
||||
_baseJobRunner: BaseJobRunner
|
||||
_jobQueueConfig: JobQueueConfig
|
||||
|
||||
constructor (jobQueueConfig: JobQueueConfig, indexer: Indexer, jobQueue: JobQueue) {
|
||||
this._indexer = indexer;
|
||||
this._jobQueue = jobQueue;
|
||||
this._jobQueueConfig = jobQueueConfig;
|
||||
this._baseJobRunner = new BaseJobRunner(this._jobQueueConfig, this._indexer, this._jobQueue);
|
||||
}
|
||||
|
||||
async start (): Promise<void> {
|
||||
await this.subscribeBlockProcessingQueue();
|
||||
await this.subscribeEventProcessingQueue();
|
||||
}
|
||||
|
||||
async subscribeBlockProcessingQueue (): Promise<void> {
|
||||
await this._jobQueue.subscribe(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
await this._baseJobRunner.processBlock(job);
|
||||
|
||||
await this._jobQueue.markComplete(job);
|
||||
});
|
||||
}
|
||||
|
||||
async subscribeEventProcessingQueue (): Promise<void> {
|
||||
await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||
const event = await this._baseJobRunner.processEvent(job);
|
||||
|
||||
const watchedContract = await this._indexer.isWatchedContract(event.contract);
|
||||
if (watchedContract) {
|
||||
await this._indexer.processEvent(event);
|
||||
}
|
||||
|
||||
await this._jobQueue.markComplete(job);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export const main = async (): Promise<any> => {
|
||||
const argv = await yargs(hideBin(process.argv))
|
||||
.option('f', {
|
||||
alias: 'config-file',
|
||||
demandOption: true,
|
||||
describe: 'configuration file path (toml)',
|
||||
type: 'string',
|
||||
default: DEFAULT_CONFIG_PATH
|
||||
})
|
||||
.argv;
|
||||
|
||||
const config = await getConfig(argv.f);
|
||||
|
||||
assert(config.server, 'Missing server config');
|
||||
|
||||
const { upstream, database: dbConfig, jobQueue: jobQueueConfig, server: { mode } } = config;
|
||||
|
||||
assert(dbConfig, 'Missing database config');
|
||||
|
||||
const db = new Database(dbConfig);
|
||||
await db.init();
|
||||
|
||||
assert(upstream, 'Missing upstream config');
|
||||
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstream;
|
||||
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
|
||||
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
|
||||
|
||||
const cache = await getCache(cacheConfig);
|
||||
const ethClient = new EthClient({
|
||||
gqlEndpoint: gqlApiEndpoint,
|
||||
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
|
||||
cache
|
||||
});
|
||||
|
||||
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
|
||||
const indexer = new Indexer(db, ethClient, ethProvider, mode);
|
||||
|
||||
assert(jobQueueConfig, 'Missing job queue config');
|
||||
|
||||
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
|
||||
assert(dbConnectionString, 'Missing job queue db connection string');
|
||||
|
||||
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
|
||||
await jobQueue.start();
|
||||
|
||||
const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue);
|
||||
await jobRunner.start();
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
log('Starting job runner...');
|
||||
}).catch(err => {
|
||||
log(err);
|
||||
});
|
||||
|
||||
process.on('uncaughtException', err => {
|
||||
log('uncaughtException', err);
|
||||
});
|
@ -6,11 +6,14 @@ import assert from 'assert';
|
||||
import BigInt from 'apollo-type-bigint';
|
||||
import debug from 'debug';
|
||||
|
||||
import { Indexer, ValueResult } from './indexer';
|
||||
import { ValueResult } from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
import { EventWatcher } from './events';
|
||||
|
||||
const log = debug('vulcanize:resolver');
|
||||
|
||||
export const createResolvers = async (indexer: Indexer): Promise<any> => {
|
||||
export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatcher): Promise<any> => {
|
||||
assert(indexer);
|
||||
|
||||
return {
|
||||
@ -26,7 +29,7 @@ export const createResolvers = async (indexer: Indexer): Promise<any> => {
|
||||
|
||||
Subscription: {
|
||||
onTokenEvent: {
|
||||
subscribe: () => indexer.getEventIterator()
|
||||
subscribe: () => eventWatcher.getEventIterator()
|
||||
}
|
||||
},
|
||||
|
||||
@ -71,7 +74,26 @@ export const createResolvers = async (indexer: Indexer): Promise<any> => {
|
||||
|
||||
events: async (_: any, { blockHash, token, name }: { blockHash: string, token: string, name: string }) => {
|
||||
log('events', blockHash, token, name || '');
|
||||
return indexer.getEvents(blockHash, token, name);
|
||||
|
||||
const block = await indexer.getBlockProgress(blockHash);
|
||||
if (!block || !block.isComplete) {
|
||||
throw new Error(`Block hash ${blockHash} number ${block?.blockNumber} not processed yet`);
|
||||
}
|
||||
|
||||
const events = await indexer.getEventsByFilter(blockHash, token, name);
|
||||
return events.map(event => indexer.getResultEvent(event));
|
||||
},
|
||||
|
||||
eventsInRange: async (_: any, { fromBlockNumber, toBlockNumber }: { fromBlockNumber: number, toBlockNumber: number }) => {
|
||||
log('eventsInRange', fromBlockNumber, toBlockNumber);
|
||||
|
||||
const { expected, actual } = await indexer.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber);
|
||||
if (expected !== actual) {
|
||||
throw new Error(`Range not available, expected ${expected}, got ${actual} blocks in range`);
|
||||
}
|
||||
|
||||
const events = await indexer.getEventsInRange(fromBlockNumber, toBlockNumber);
|
||||
return events.map(event => indexer.getResultEvent(event));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -133,6 +133,12 @@ type Query {
|
||||
token: String!
|
||||
name: String
|
||||
): [ResultEvent!]
|
||||
|
||||
# Get token events in a given block range.
|
||||
eventsInRange(
|
||||
fromBlockNumber: Int!
|
||||
toBlockNumber: Int!
|
||||
): [ResultEvent!]
|
||||
}
|
||||
|
||||
#
|
||||
|
@ -15,9 +15,8 @@ import { getDefaultProvider } from 'ethers';
|
||||
|
||||
import { getCache } from '@vulcanize/cache';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import { DEFAULT_CONFIG_PATH, getConfig } from '@vulcanize/util';
|
||||
import { DEFAULT_CONFIG_PATH, getConfig, JobQueue } from '@vulcanize/util';
|
||||
|
||||
import artifacts from './artifacts/ERC20.json';
|
||||
import typeDefs from './schema';
|
||||
|
||||
import { createResolvers as createMockResolvers } from './mock/resolvers';
|
||||
@ -45,7 +44,7 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
const { host, port, mode } = config.server;
|
||||
|
||||
const { upstream, database: dbConfig } = config;
|
||||
const { upstream, database: dbConfig, jobQueue: jobQueueConfig } = config;
|
||||
|
||||
assert(dbConfig, 'Missing database config');
|
||||
|
||||
@ -69,12 +68,20 @@ export const main = async (): Promise<any> => {
|
||||
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
|
||||
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
|
||||
const pubsub = new PubSub();
|
||||
const indexer = new Indexer(db, ethClient, ethProvider, pubsub, artifacts, mode);
|
||||
const indexer = new Indexer(db, ethClient, ethProvider, mode);
|
||||
|
||||
const eventWatcher = new EventWatcher(ethClient, indexer);
|
||||
assert(jobQueueConfig, 'Missing job queue config');
|
||||
|
||||
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
|
||||
assert(dbConnectionString, 'Missing job queue db connection string');
|
||||
|
||||
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
|
||||
await jobQueue.start();
|
||||
|
||||
const eventWatcher = new EventWatcher(ethClient, indexer, pubsub, jobQueue);
|
||||
await eventWatcher.start();
|
||||
|
||||
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer);
|
||||
const resolvers = process.env.MOCK ? await createMockResolvers() : await createResolvers(indexer, eventWatcher);
|
||||
|
||||
const app: Application = express();
|
||||
const server = new ApolloServer({
|
||||
|
34
packages/erc20-watcher/test/tasks/token-approve.ts
Normal file
34
packages/erc20-watcher/test/tasks/token-approve.ts
Normal file
@ -0,0 +1,34 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import { task, types } from 'hardhat/config';
|
||||
import '@nomiclabs/hardhat-ethers';
|
||||
import { ContractTransaction, BigNumber } from 'ethers';
|
||||
|
||||
const DEFAULT_APPROVE_AMOUNT = '1000000000000000000000000';
|
||||
|
||||
task('token-approve', 'Move tokens to recipient')
|
||||
.addParam('token', 'Token contract address', undefined, types.string)
|
||||
.addParam('spender', 'Spender address', undefined, types.string)
|
||||
.addParam('amount', 'Token amount to transfer', DEFAULT_APPROVE_AMOUNT, types.string)
|
||||
.setAction(async (args, hre) => {
|
||||
const { token: tokenAddress, amount, spender } = args;
|
||||
await hre.run('compile');
|
||||
const Token = await hre.ethers.getContractFactory('GLDToken');
|
||||
const token = Token.attach(tokenAddress);
|
||||
|
||||
const transaction: ContractTransaction = await token.approve(spender, BigNumber.from(amount));
|
||||
const receipt = await transaction.wait();
|
||||
|
||||
if (receipt.events) {
|
||||
const TransferEvent = receipt.events.find(el => el.event === 'Approval');
|
||||
|
||||
if (TransferEvent && TransferEvent.args) {
|
||||
console.log('Approval Event');
|
||||
console.log('owner:', TransferEvent.args.owner.toString());
|
||||
console.log('spender:', TransferEvent.args.spender.toString());
|
||||
console.log('value:', TransferEvent.args.value.toString());
|
||||
}
|
||||
}
|
||||
});
|
37
packages/erc20-watcher/test/tasks/token-transfer-from.ts
Normal file
37
packages/erc20-watcher/test/tasks/token-transfer-from.ts
Normal file
@ -0,0 +1,37 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import { task, types } from 'hardhat/config';
|
||||
import '@nomiclabs/hardhat-ethers';
|
||||
import { ContractTransaction } from 'ethers';
|
||||
|
||||
task('token-transfer-from', 'Send tokens as spender')
|
||||
.addParam('token', 'Token contract address', undefined, types.string)
|
||||
.addParam('spenderKey', 'Spender private key', undefined, types.string)
|
||||
.addParam('to', 'Transfer recipient address', undefined, types.string)
|
||||
.addParam('amount', 'Token amount to transfer', undefined, types.int)
|
||||
.setAction(async (args, hre) => {
|
||||
const { token: tokenAddress, to, amount, spenderKey } = args;
|
||||
await hre.run('compile');
|
||||
const [owner] = await hre.ethers.getSigners();
|
||||
const wallet = new hre.ethers.Wallet(spenderKey, hre.ethers.provider);
|
||||
const Token = await hre.ethers.getContractFactory('GLDToken');
|
||||
let token = Token.attach(tokenAddress);
|
||||
|
||||
token = token.connect(wallet);
|
||||
const transaction: ContractTransaction = await token.transferFrom(owner.address, to, amount);
|
||||
|
||||
const receipt = await transaction.wait();
|
||||
|
||||
if (receipt.events) {
|
||||
const TransferEvent = receipt.events.find(el => el.event === 'Transfer');
|
||||
|
||||
if (TransferEvent && TransferEvent.args) {
|
||||
console.log('Transfer Event');
|
||||
console.log('from:', TransferEvent.args.from.toString());
|
||||
console.log('to:', TransferEvent.args.to.toString());
|
||||
console.log('value:', TransferEvent.args.value.toString());
|
||||
}
|
||||
}
|
||||
});
|
@ -103,10 +103,6 @@ export class EthClient {
|
||||
return this._graphqlClient.subscribe(ethQueries.subscribeBlocks, onNext);
|
||||
}
|
||||
|
||||
async watchLogs (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {
|
||||
return this._graphqlClient.subscribe(ethQueries.subscribeLogs, onNext);
|
||||
}
|
||||
|
||||
async watchTransactions (onNext: (value: any) => void): Promise<ZenObservable.Subscription> {
|
||||
return this._graphqlClient.subscribe(ethQueries.subscribeTransactions, onNext);
|
||||
}
|
||||
|
@ -28,6 +28,8 @@ query getLogs($blockHash: Bytes32!, $contract: Address) {
|
||||
index
|
||||
cid
|
||||
ipldBlock
|
||||
receiptCID
|
||||
status
|
||||
}
|
||||
block(hash: $blockHash) {
|
||||
number
|
||||
@ -75,31 +77,6 @@ query block($blockHash: Bytes32) {
|
||||
}
|
||||
`;
|
||||
|
||||
export const subscribeLogs = gql`
|
||||
subscription SubscriptionReceipt {
|
||||
listen(topic: "receipt_cids") {
|
||||
relatedNode {
|
||||
... on ReceiptCid {
|
||||
logContracts
|
||||
topic0S
|
||||
topic1S
|
||||
topic2S
|
||||
topic3S
|
||||
contract
|
||||
ethTransactionCidByTxId {
|
||||
txHash
|
||||
ethHeaderCidByHeaderId {
|
||||
blockHash
|
||||
blockNumber
|
||||
parentHash
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
export const subscribeBlocks = gql`
|
||||
subscription {
|
||||
listen(topic: "header_cids") {
|
||||
@ -137,7 +114,6 @@ export default {
|
||||
getLogs,
|
||||
getBlockWithTransactions,
|
||||
getBlockByHash,
|
||||
subscribeLogs,
|
||||
subscribeBlocks,
|
||||
subscribeTransactions
|
||||
};
|
||||
|
@ -116,9 +116,12 @@ export class Indexer {
|
||||
},
|
||||
transaction: {
|
||||
hash: txHash
|
||||
}
|
||||
},
|
||||
receiptCID,
|
||||
status
|
||||
} = logObj;
|
||||
|
||||
if (status) {
|
||||
const tx = transactionMap[txHash];
|
||||
assert(ethers.utils.getAddress(address) === contract);
|
||||
|
||||
@ -156,14 +159,18 @@ export class Indexer {
|
||||
},
|
||||
proof: {
|
||||
data: JSONbig.stringify({
|
||||
blockHash: hash,
|
||||
receipt: {
|
||||
blockHash,
|
||||
receiptCID,
|
||||
log: {
|
||||
cid,
|
||||
ipldBlock
|
||||
}
|
||||
})
|
||||
}
|
||||
});
|
||||
} else {
|
||||
log(`Skipping event for receipt ${receiptCID} due to failed transaction.`);
|
||||
}
|
||||
}
|
||||
|
||||
return events;
|
||||
|
@ -5,23 +5,23 @@
|
||||
"license": "AGPL-3.0",
|
||||
"private": true,
|
||||
"dependencies": {
|
||||
"@apollo/client": "^3.3.19",
|
||||
"@vulcanize/cache": "^0.1.0",
|
||||
"@vulcanize/erc20-watcher": "^0.1.0",
|
||||
"@vulcanize/ipld-eth-client": "^0.1.0",
|
||||
"@vulcanize/util": "^0.1.0",
|
||||
"@vulcanize/uni-watcher": "^0.1.0",
|
||||
"@apollo/client": "^3.3.19",
|
||||
"@vulcanize/util": "^0.1.0",
|
||||
"apollo-server-express": "^2.25.0",
|
||||
"apollo-type-bigint": "^0.1.3",
|
||||
"decimal.js": "^10.3.1",
|
||||
"typeorm": "^0.2.32",
|
||||
"debug": "^4.3.1",
|
||||
"reflect-metadata": "^0.1.13",
|
||||
"graphql-request": "^3.4.0",
|
||||
"yargs": "^17.0.1",
|
||||
"json-bigint": "^1.0.0",
|
||||
"decimal.js": "^10.3.1",
|
||||
"express": "^4.17.1",
|
||||
"graphql-import-node": "^0.0.4"
|
||||
"graphql-import-node": "^0.0.4",
|
||||
"graphql-request": "^3.4.0",
|
||||
"json-bigint": "^1.0.0",
|
||||
"reflect-metadata": "^0.1.13",
|
||||
"typeorm": "^0.2.32",
|
||||
"yargs": "^17.0.1"
|
||||
},
|
||||
"scripts": {
|
||||
"lint": "eslint .",
|
||||
@ -56,13 +56,13 @@
|
||||
"eslint-plugin-node": "^11.1.0",
|
||||
"eslint-plugin-promise": "^5.1.0",
|
||||
"eslint-plugin-standard": "^5.0.0",
|
||||
"ethers": "^5.2.0",
|
||||
"get-graphql-schema": "^2.1.2",
|
||||
"graphql-schema-linter": "^2.0.1",
|
||||
"lodash": "^4.17.21",
|
||||
"mocha": "^8.4.0",
|
||||
"nodemon": "^2.0.7",
|
||||
"ts-node": "^10.0.0",
|
||||
"typescript": "^4.3.2",
|
||||
"ethers": "^5.2.0",
|
||||
"lodash": "^4.17.21"
|
||||
"typescript": "^4.3.2"
|
||||
}
|
||||
}
|
||||
|
@ -556,10 +556,10 @@ export class Database implements DatabaseInterface {
|
||||
return this._baseDatabase.saveEventEntity(repo, entity);
|
||||
}
|
||||
|
||||
async getBlockEvents (blockHash: string): Promise<Event[]> {
|
||||
async getBlockEvents (blockHash: string, where: FindConditions<Event>): Promise<Event[]> {
|
||||
const repo = this._conn.getRepository(Event);
|
||||
|
||||
return this._baseDatabase.getBlockEvents(repo, blockHash);
|
||||
return this._baseDatabase.getBlockEvents(repo, blockHash, where);
|
||||
}
|
||||
|
||||
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
|
||||
|
@ -37,13 +37,6 @@ const SYNC_DELTA = 5;
|
||||
|
||||
const log = debug('vulcanize:indexer');
|
||||
|
||||
export interface ValueResult {
|
||||
value: string | bigint;
|
||||
proof: {
|
||||
data: string;
|
||||
}
|
||||
}
|
||||
|
||||
export { OrderDirection, BlockHeight };
|
||||
|
||||
export class Indexer implements IndexerInterface {
|
||||
|
@ -32,12 +32,12 @@
|
||||
},
|
||||
"homepage": "https://github.com/vulcanize/watcher-ts#readme",
|
||||
"dependencies": {
|
||||
"@apollo/client": "^3.3.19",
|
||||
"@types/lodash": "^4.14.168",
|
||||
"@vulcanize/cache": "^0.1.0",
|
||||
"@vulcanize/ipld-eth-client": "^0.1.0",
|
||||
"@vulcanize/solidity-mapper": "^0.1.0",
|
||||
"@vulcanize/util": "^0.1.0",
|
||||
"@apollo/client": "^3.3.19",
|
||||
"apollo-server-express": "^2.25.0",
|
||||
"apollo-type-bigint": "^0.1.3",
|
||||
"debug": "^4.3.1",
|
||||
|
@ -37,13 +37,6 @@ export class Database implements DatabaseInterface {
|
||||
return this._baseDatabase.close();
|
||||
}
|
||||
|
||||
async getContract (address: string): Promise<Contract | undefined> {
|
||||
return this._conn.getRepository(Contract)
|
||||
.createQueryBuilder('contract')
|
||||
.where('address = :address', { address })
|
||||
.getOne();
|
||||
}
|
||||
|
||||
async getLatestContract (kind: string): Promise<Contract | undefined> {
|
||||
return this._conn.getRepository(Contract)
|
||||
.createQueryBuilder('contract')
|
||||
@ -52,6 +45,12 @@ export class Database implements DatabaseInterface {
|
||||
.getOne();
|
||||
}
|
||||
|
||||
async getContract (address: string): Promise<Contract | undefined> {
|
||||
const repo = this._conn.getRepository(Contract);
|
||||
|
||||
return this._baseDatabase.getContract(repo, address);
|
||||
}
|
||||
|
||||
async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise<void> {
|
||||
const repo = queryRunner.manager.getRepository(Contract);
|
||||
|
||||
@ -79,10 +78,10 @@ export class Database implements DatabaseInterface {
|
||||
return this._baseDatabase.saveEventEntity(repo, entity);
|
||||
}
|
||||
|
||||
async getBlockEvents (blockHash: string): Promise<Event[]> {
|
||||
async getBlockEvents (blockHash: string, where: FindConditions<Event>): Promise<Event[]> {
|
||||
const repo = this._conn.getRepository(Event);
|
||||
|
||||
return this._baseDatabase.getBlockEvents(repo, blockHash);
|
||||
return this._baseDatabase.getBlockEvents(repo, blockHash, where);
|
||||
}
|
||||
|
||||
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<void> {
|
||||
|
@ -9,7 +9,6 @@ import { ethers } from 'ethers';
|
||||
import assert from 'assert';
|
||||
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import { GetStorageAt, getStorageValue, StorageLayout } from '@vulcanize/solidity-mapper';
|
||||
import { IndexerInterface, Indexer as BaseIndexer } from '@vulcanize/util';
|
||||
|
||||
import { Database } from './database';
|
||||
@ -36,18 +35,10 @@ type ResultEvent = {
|
||||
proof: string;
|
||||
};
|
||||
|
||||
interface ValueResult {
|
||||
value: any;
|
||||
proof: {
|
||||
data: string;
|
||||
}
|
||||
}
|
||||
|
||||
export class Indexer implements IndexerInterface {
|
||||
_db: Database
|
||||
_ethClient: EthClient
|
||||
_postgraphileClient: EthClient
|
||||
_getStorageAt: GetStorageAt
|
||||
_baseIndexer: BaseIndexer
|
||||
|
||||
_factoryContract: ethers.utils.Interface
|
||||
@ -58,7 +49,6 @@ export class Indexer implements IndexerInterface {
|
||||
this._db = db;
|
||||
this._ethClient = ethClient;
|
||||
this._postgraphileClient = postgraphileClient;
|
||||
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
|
||||
this._baseIndexer = new BaseIndexer(this._db, this._ethClient);
|
||||
|
||||
this._factoryContract = new ethers.utils.Interface(factoryABI);
|
||||
@ -99,27 +89,6 @@ export class Indexer implements IndexerInterface {
|
||||
};
|
||||
}
|
||||
|
||||
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
|
||||
if (contract) {
|
||||
const uniContract = await this.isUniswapContract(contract);
|
||||
if (!uniContract) {
|
||||
throw new Error('Not a uniswap contract');
|
||||
}
|
||||
}
|
||||
|
||||
const events = await this._db.getBlockEvents(blockHash);
|
||||
log(`getEvents: db hit, num events: ${events.length}`);
|
||||
|
||||
// Filtering.
|
||||
const result = events
|
||||
// TODO: Filter using db WHERE condition on contract.
|
||||
.filter(event => !contract || contract === event.contract)
|
||||
// TODO: Filter using db WHERE condition when name is not empty.
|
||||
.filter(event => !name || name === event.eventName);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
async triggerIndexingOnEvent (dbTx: QueryRunner, dbEvent: Event): Promise<void> {
|
||||
const re = this.getResultEvent(dbEvent);
|
||||
|
||||
@ -131,10 +100,6 @@ export class Indexer implements IndexerInterface {
|
||||
}
|
||||
}
|
||||
|
||||
async isUniswapContract (address: string): Promise<Contract | undefined> {
|
||||
return this._db.getContract(ethers.utils.getAddress(address));
|
||||
}
|
||||
|
||||
async processEvent (event: Event): Promise<void> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
|
||||
@ -295,7 +260,7 @@ export class Indexer implements IndexerInterface {
|
||||
async position (blockHash: string, tokenId: string): Promise<any> {
|
||||
const nfpmContract = await this._db.getLatestContract('nfpm');
|
||||
assert(nfpmContract, 'No NFPM contract watched.');
|
||||
const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_positions', BigInt(tokenId));
|
||||
const { value, proof } = await this._baseIndexer.getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_positions', BigInt(tokenId));
|
||||
|
||||
return {
|
||||
...value,
|
||||
@ -306,7 +271,7 @@ export class Indexer implements IndexerInterface {
|
||||
async poolIdToPoolKey (blockHash: string, poolId: string): Promise<any> {
|
||||
const nfpmContract = await this._db.getLatestContract('nfpm');
|
||||
assert(nfpmContract, 'No NFPM contract watched.');
|
||||
const { value, proof } = await this._getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_poolIdToPoolKey', BigInt(poolId));
|
||||
const { value, proof } = await this._baseIndexer.getStorageValue(nfpmStorageLayout, blockHash, nfpmContract.address, '_poolIdToPoolKey', BigInt(poolId));
|
||||
|
||||
return {
|
||||
...value,
|
||||
@ -317,7 +282,7 @@ export class Indexer implements IndexerInterface {
|
||||
async getPool (blockHash: string, token0: string, token1: string, fee: string): Promise<any> {
|
||||
const factoryContract = await this._db.getLatestContract('factory');
|
||||
assert(factoryContract, 'No Factory contract watched.');
|
||||
const { value, proof } = await this._getStorageValue(factoryStorageLayout, blockHash, factoryContract.address, 'getPool', token0, token1, BigInt(fee));
|
||||
const { value, proof } = await this._baseIndexer.getStorageValue(factoryStorageLayout, blockHash, factoryContract.address, 'getPool', token0, token1, BigInt(fee));
|
||||
|
||||
return {
|
||||
pool: value,
|
||||
@ -330,6 +295,14 @@ export class Indexer implements IndexerInterface {
|
||||
return contract;
|
||||
}
|
||||
|
||||
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<Event>> {
|
||||
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
|
||||
}
|
||||
|
||||
async isWatchedContract (address: string): Promise<Contract | undefined> {
|
||||
return this._baseIndexer.isWatchedContract(address);
|
||||
}
|
||||
|
||||
async saveEventEntity (dbEvent: Event): Promise<Event> {
|
||||
return this._baseIndexer.saveEventEntity(dbEvent);
|
||||
}
|
||||
@ -431,16 +404,19 @@ export class Indexer implements IndexerInterface {
|
||||
},
|
||||
transaction: {
|
||||
hash: txHash
|
||||
}
|
||||
},
|
||||
receiptCID,
|
||||
status
|
||||
} = logObj;
|
||||
|
||||
if (status) {
|
||||
let eventName = UNKNOWN_EVENT_NAME;
|
||||
let eventInfo = {};
|
||||
const tx = transactionMap[txHash];
|
||||
const extraInfo = { topics, data, tx };
|
||||
|
||||
const contract = ethers.utils.getAddress(address);
|
||||
const uniContract = await this.isUniswapContract(contract);
|
||||
const uniContract = await this.isWatchedContract(contract);
|
||||
|
||||
if (uniContract) {
|
||||
const eventDetails = this.parseEventNameAndArgs(uniContract.kind, logObj);
|
||||
@ -458,13 +434,17 @@ export class Indexer implements IndexerInterface {
|
||||
proof: JSONbig.stringify({
|
||||
data: JSONbig.stringify({
|
||||
blockHash,
|
||||
receipt: {
|
||||
receiptCID,
|
||||
log: {
|
||||
cid,
|
||||
ipldBlock
|
||||
}
|
||||
})
|
||||
})
|
||||
});
|
||||
} else {
|
||||
log(`Skipping event for receipt ${receiptCID} due to failed transaction.`);
|
||||
}
|
||||
}
|
||||
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
@ -486,16 +466,4 @@ export class Indexer implements IndexerInterface {
|
||||
await dbTx.release();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Move into base/class or framework package.
|
||||
async _getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise<ValueResult> {
|
||||
return getStorageValue(
|
||||
storageLayout,
|
||||
this._getStorageAt,
|
||||
blockHash,
|
||||
token,
|
||||
variable,
|
||||
...mappingKeys
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -59,13 +59,13 @@ export class JobRunner {
|
||||
let dbEvent;
|
||||
const { data: { id } } = job;
|
||||
|
||||
const uniContract = await this._indexer.isUniswapContract(event.contract);
|
||||
if (uniContract) {
|
||||
const watchedContract = await this._indexer.isWatchedContract(event.contract);
|
||||
if (watchedContract) {
|
||||
// We might not have parsed this event yet. This can happen if the contract was added
|
||||
// as a result of a previous event in the same block.
|
||||
if (event.eventName === UNKNOWN_EVENT_NAME) {
|
||||
const logObj = JSON.parse(event.extraInfo);
|
||||
const { eventName, eventInfo } = this._indexer.parseEventNameAndArgs(uniContract.kind, logObj);
|
||||
const { eventName, eventInfo } = this._indexer.parseEventNameAndArgs(watchedContract.kind, logObj);
|
||||
event.eventName = eventName;
|
||||
event.eventInfo = JSON.stringify(eventInfo);
|
||||
dbEvent = await this._indexer.saveEventEntity(event);
|
||||
|
@ -8,7 +8,6 @@ import debug from 'debug';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
import { EventWatcher } from './events';
|
||||
import { UNKNOWN_EVENT_NAME } from './entity/Event';
|
||||
|
||||
const log = debug('vulcanize:resolver');
|
||||
|
||||
@ -64,8 +63,7 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
|
||||
}
|
||||
|
||||
const events = await indexer.getEventsByFilter(blockHash, contract, name);
|
||||
return events.filter(event => event.eventName !== UNKNOWN_EVENT_NAME)
|
||||
.map(event => indexer.getResultEvent(event));
|
||||
return events.map(event => indexer.getResultEvent(event));
|
||||
},
|
||||
|
||||
eventsInRange: async (_: any, { fromBlockNumber, toBlockNumber }: { fromBlockNumber: number, toBlockNumber: number }) => {
|
||||
|
@ -124,7 +124,7 @@ describe('uni-watcher', () => {
|
||||
|
||||
// Verifying with the db.
|
||||
const indexer = new Indexer(db, ethClient, postgraphileClient);
|
||||
assert(await indexer.isUniswapContract(factory.address), 'Factory contract not added to the database.');
|
||||
assert(await indexer.isWatchedContract(factory.address), 'Factory contract not added to the database.');
|
||||
});
|
||||
|
||||
it('should deploy 2 tokens', async () => {
|
||||
@ -259,7 +259,7 @@ describe('uni-watcher', () => {
|
||||
|
||||
// Verifying with the db.
|
||||
const indexer = new Indexer(db, ethClient, postgraphileClient);
|
||||
assert(await indexer.isUniswapContract(nfpm.address), 'NFPM contract not added to the database.');
|
||||
assert(await indexer.isWatchedContract(nfpm.address), 'NFPM contract not added to the database.');
|
||||
});
|
||||
|
||||
it('should mint specified amount: nfpm', done => {
|
||||
|
@ -4,6 +4,7 @@
|
||||
"main": "dist/index.js",
|
||||
"license": "AGPL-3.0",
|
||||
"dependencies": {
|
||||
"@vulcanize/solidity-mapper": "^0.1.0",
|
||||
"debug": "^4.3.1",
|
||||
"ethers": "^5.2.0",
|
||||
"fs-extra": "^10.0.0",
|
||||
|
@ -12,3 +12,5 @@ export const JOB_KIND_INDEX = 'index';
|
||||
export const JOB_KIND_PRUNE = 'prune';
|
||||
|
||||
export const DEFAULT_CONFIG_PATH = 'environments/local.toml';
|
||||
|
||||
export const UNKNOWN_EVENT_NAME = '__unknown__';
|
||||
|
@ -18,9 +18,8 @@ import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
|
||||
import _ from 'lodash';
|
||||
|
||||
import { BlockProgressInterface, ContractInterface, EventInterface, SyncStatusInterface } from './types';
|
||||
import { MAX_REORG_DEPTH } from './constants';
|
||||
import { MAX_REORG_DEPTH, UNKNOWN_EVENT_NAME } from './constants';
|
||||
|
||||
const UNKNOWN_EVENT_NAME = '__unknown__';
|
||||
const DEFAULT_LIMIT = 100;
|
||||
const DEFAULT_SKIP = 0;
|
||||
|
||||
@ -180,12 +179,19 @@ export class Database {
|
||||
return repo.findOne(id, { relations: ['block'] });
|
||||
}
|
||||
|
||||
async getBlockEvents (repo: Repository<EventInterface>, blockHash: string): Promise<EventInterface[]> {
|
||||
return repo.createQueryBuilder('event')
|
||||
.innerJoinAndSelect('event.block', 'block')
|
||||
.where('block_hash = :blockHash', { blockHash })
|
||||
.addOrderBy('event.id', 'ASC')
|
||||
.getMany();
|
||||
async getBlockEvents (repo: Repository<EventInterface>, blockHash: string, where: FindConditions<EventInterface> = {}): Promise<EventInterface[]> {
|
||||
where.block = {
|
||||
...where.block,
|
||||
blockHash
|
||||
};
|
||||
|
||||
return repo.find({
|
||||
where,
|
||||
relations: ['block'],
|
||||
order: {
|
||||
id: 'ASC'
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async saveEvents (blockRepo: Repository<BlockProgressInterface>, eventRepo: Repository<EventInterface>, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void> {
|
||||
@ -540,6 +546,12 @@ export class Database {
|
||||
return { canonicalBlockNumber, blockHashes };
|
||||
}
|
||||
|
||||
async getContract (repo: Repository<ContractInterface>, address: string): Promise<ContractInterface | undefined> {
|
||||
return repo.createQueryBuilder('contract')
|
||||
.where('address = :address', { address })
|
||||
.getOne();
|
||||
}
|
||||
|
||||
async saveContract (repo: Repository<ContractInterface>, address: string, startingBlock: number, kind?: string): Promise<void> {
|
||||
const numRows = await repo
|
||||
.createQueryBuilder()
|
||||
|
@ -3,24 +3,36 @@
|
||||
//
|
||||
|
||||
import assert from 'assert';
|
||||
import { DeepPartial } from 'typeorm';
|
||||
import { DeepPartial, FindConditions, Not } from 'typeorm';
|
||||
import debug from 'debug';
|
||||
import { ethers } from 'ethers';
|
||||
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import { GetStorageAt, getStorageValue, StorageLayout } from '@vulcanize/solidity-mapper';
|
||||
|
||||
import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface } from './types';
|
||||
import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface, ContractInterface } from './types';
|
||||
import { UNKNOWN_EVENT_NAME } from './constants';
|
||||
|
||||
const MAX_EVENTS_BLOCK_RANGE = 1000;
|
||||
|
||||
const log = debug('vulcanize:indexer');
|
||||
|
||||
export interface ValueResult {
|
||||
value: any;
|
||||
proof?: {
|
||||
data: string;
|
||||
}
|
||||
}
|
||||
|
||||
export class Indexer {
|
||||
_db: DatabaseInterface;
|
||||
_ethClient: EthClient;
|
||||
_getStorageAt: GetStorageAt
|
||||
|
||||
constructor (db: DatabaseInterface, ethClient: EthClient) {
|
||||
this._db = db;
|
||||
this._ethClient = ethClient;
|
||||
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
|
||||
}
|
||||
|
||||
async getSyncStatus (): Promise<SyncStatusInterface | undefined> {
|
||||
@ -158,6 +170,32 @@ export class Indexer {
|
||||
return this._db.getBlockEvents(blockHash);
|
||||
}
|
||||
|
||||
async getEventsByFilter (blockHash: string, contract: string, name: string | null): Promise<Array<EventInterface>> {
|
||||
if (contract) {
|
||||
const watchedContract = await this.isWatchedContract(contract);
|
||||
if (!watchedContract) {
|
||||
throw new Error('Not a watched contract');
|
||||
}
|
||||
}
|
||||
|
||||
const where: FindConditions<EventInterface> = {
|
||||
eventName: Not(UNKNOWN_EVENT_NAME)
|
||||
};
|
||||
|
||||
if (contract) {
|
||||
where.contract = contract;
|
||||
}
|
||||
|
||||
if (name) {
|
||||
where.eventName = name;
|
||||
}
|
||||
|
||||
const events = await this._db.getBlockEvents(blockHash, where);
|
||||
log(`getEvents: db hit, num events: ${events.length}`);
|
||||
|
||||
return events;
|
||||
}
|
||||
|
||||
async getAncestorAtDepth (blockHash: string, depth: number): Promise<string> {
|
||||
return this._db.getAncestorAtDepth(blockHash, depth);
|
||||
}
|
||||
@ -194,4 +232,21 @@ export class Indexer {
|
||||
|
||||
return this._db.getEventsInRange(fromBlockNumber, toBlockNumber);
|
||||
}
|
||||
|
||||
async isWatchedContract (address : string): Promise<ContractInterface | undefined> {
|
||||
assert(this._db.getContract);
|
||||
|
||||
return this._db.getContract(ethers.utils.getAddress(address));
|
||||
}
|
||||
|
||||
async getStorageValue (storageLayout: StorageLayout, blockHash: string, token: string, variable: string, ...mappingKeys: any[]): Promise<ValueResult> {
|
||||
return getStorageValue(
|
||||
storageLayout,
|
||||
this._getStorageAt,
|
||||
blockHash,
|
||||
token,
|
||||
variable,
|
||||
...mappingKeys
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ export interface ContractInterface {
|
||||
id: number;
|
||||
address: string;
|
||||
startingBlock: number;
|
||||
kind?: string;
|
||||
kind: string;
|
||||
}
|
||||
|
||||
export interface IndexerInterface {
|
||||
@ -72,7 +72,7 @@ export interface DatabaseInterface {
|
||||
createTransactionRunner(): Promise<QueryRunner>;
|
||||
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>;
|
||||
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>;
|
||||
getBlockEvents (blockHash: string): Promise<EventInterface[]>;
|
||||
getBlockEvents (blockHash: string, where?: FindConditions<EventInterface>): Promise<EventInterface[]>;
|
||||
getEvent (id: string): Promise<EventInterface | undefined>
|
||||
getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatusInterface | undefined>
|
||||
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
|
||||
@ -86,4 +86,5 @@ export interface DatabaseInterface {
|
||||
saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void>;
|
||||
saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise<EventInterface>;
|
||||
removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void>;
|
||||
getContract?: (address: string) => Promise<ContractInterface | undefined>
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ then
|
||||
createdb uni-watcher
|
||||
createdb uni-info-watcher
|
||||
|
||||
psql -d erc20-watcher-job-queue -c "delete from pgboss.job;"
|
||||
psql -d address-watcher-job-queue -c "delete from pgboss.job;"
|
||||
psql -d uni-watcher-job-queue -c "delete from pgboss.job;"
|
||||
psql -d uni-info-watcher-job-queue -c "delete from pgboss.job;"
|
||||
|
66
yarn.lock
66
yarn.lock
@ -2644,7 +2644,12 @@
|
||||
dependencies:
|
||||
"@types/yargs-parser" "*"
|
||||
|
||||
"@types/zen-observable@^0.8.0", "@types/zen-observable@^0.8.2":
|
||||
"@types/zen-observable@0.8.3":
|
||||
version "0.8.3"
|
||||
resolved "https://registry.yarnpkg.com/@types/zen-observable/-/zen-observable-0.8.3.tgz#781d360c282436494b32fe7d9f7f8e64b3118aa3"
|
||||
integrity sha512-fbF6oTd4sGGy0xjHPKAt+eS2CrxJ3+6gQ3FGcBoIJR2TLAyCkCyI8JqZNy+FeON0AhVgNJoUumVoZQjBFUqHkw==
|
||||
|
||||
"@types/zen-observable@^0.8.0":
|
||||
version "0.8.2"
|
||||
resolved "https://registry.yarnpkg.com/@types/zen-observable/-/zen-observable-0.8.2.tgz#808c9fa7e4517274ed555fa158f2de4b4f468e71"
|
||||
integrity sha512-HrCIVMLjE1MOozVoD86622S7aunluLb2PJdPfb3nYiEtohm8mIB/vyv0Fd37AdeMFrTUQXEunw78YloMA3Qilg==
|
||||
@ -4567,9 +4572,9 @@ chalk@^3.0.0:
|
||||
supports-color "^7.1.0"
|
||||
|
||||
chalk@^4.0.0, chalk@^4.1.0:
|
||||
version "4.1.1"
|
||||
resolved "https://registry.yarnpkg.com/chalk/-/chalk-4.1.1.tgz#c80b3fab28bf6371e6863325eee67e618b77e6ad"
|
||||
integrity sha512-diHzdDKxcU+bAsUboHLPEDQiw0qEe0qd7SYUn3HgcFlWgbDcfLGswOHYeGrHKzG9z6UYf01d9VFMfZxPM1xZSg==
|
||||
version "4.1.2"
|
||||
resolved "https://registry.yarnpkg.com/chalk/-/chalk-4.1.2.tgz#aac4e2b7734a740867aeb16bf02aad556a1e7a01"
|
||||
integrity sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==
|
||||
dependencies:
|
||||
ansi-styles "^4.1.0"
|
||||
supports-color "^7.1.0"
|
||||
@ -4707,7 +4712,7 @@ cli-cursor@^3.1.0:
|
||||
dependencies:
|
||||
restore-cursor "^3.1.0"
|
||||
|
||||
cli-highlight@^2.1.10:
|
||||
cli-highlight@^2.1.11:
|
||||
version "2.1.11"
|
||||
resolved "https://registry.yarnpkg.com/cli-highlight/-/cli-highlight-2.1.11.tgz#49736fa452f0aaf4fae580e30acb26828d2dc1bf"
|
||||
integrity sha512-9KDcoEVwyUXrjcJNvHD0NFc/hiwe/WPVYIleQh2O1N2Zro5gWJZ/K+3DGn8w8P/F6FxOgzyC5bxDyHIgCSPhGg==
|
||||
@ -5284,7 +5289,7 @@ debug@3.2.6:
|
||||
dependencies:
|
||||
ms "^2.1.1"
|
||||
|
||||
debug@4, debug@4.3.1, debug@^4.0.1, debug@^4.1.0, debug@^4.1.1, debug@^4.3.1:
|
||||
debug@4, debug@4.3.1, debug@^4.0.1, debug@^4.1.0, debug@^4.1.1:
|
||||
version "4.3.1"
|
||||
resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.1.tgz#f0d229c505e0c6d8c49ac553d1b13dc183f6b2ee"
|
||||
integrity sha512-doEwdvm4PCeK4K3RQN2ZC2BYUBaxwLARCqZmMjtF8a51J2Rb0xpVloFRnCODwqjpwnAoao4pelN8l3RJdv3gRQ==
|
||||
@ -5298,7 +5303,7 @@ debug@^3.1.0, debug@^3.2.6, debug@^3.2.7:
|
||||
dependencies:
|
||||
ms "^2.1.1"
|
||||
|
||||
debug@^4.2.0:
|
||||
debug@^4.2.0, debug@^4.3.1:
|
||||
version "4.3.2"
|
||||
resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.2.tgz#f0a49c18ac8779e31d4a0c6029dfb76873c7428b"
|
||||
integrity sha512-mOp8wKcvj7XxC78zLgw/ZA+6TSgkoE2C/ienthhRD298T7UNwAg9diBpLRxC0mOezLl4B0xV7M0cCO6P/O0Xhw==
|
||||
@ -6822,9 +6827,9 @@ fetch-ponyfill@^4.0.0:
|
||||
node-fetch "~1.7.1"
|
||||
|
||||
figlet@^1.1.1:
|
||||
version "1.5.0"
|
||||
resolved "https://registry.yarnpkg.com/figlet/-/figlet-1.5.0.tgz#2db4d00a584e5155a96080632db919213c3e003c"
|
||||
integrity sha512-ZQJM4aifMpz6H19AW1VqvZ7l4pOE9p7i/3LyxgO2kp+PO/VcDYNqIHEMtkccqIhTXMKci4kjueJr/iCQEaT/Ww==
|
||||
version "1.5.2"
|
||||
resolved "https://registry.yarnpkg.com/figlet/-/figlet-1.5.2.tgz#dda34ff233c9a48e36fcff6741aeb5bafe49b634"
|
||||
integrity sha512-WOn21V8AhyE1QqVfPIVxe3tupJacq1xGkPTB4iagT6o+P2cAgEOOwIxMftr4+ZCTI6d551ij9j61DFr0nsP2uQ==
|
||||
|
||||
figures@^3.0.0:
|
||||
version "3.2.0"
|
||||
@ -7782,9 +7787,9 @@ heap@0.2.6:
|
||||
integrity sha1-CH4fELBGky/IWU3Z5tN4r8nR5aw=
|
||||
|
||||
highlight.js@^10.7.1:
|
||||
version "10.7.2"
|
||||
resolved "https://registry.yarnpkg.com/highlight.js/-/highlight.js-10.7.2.tgz#89319b861edc66c48854ed1e6da21ea89f847360"
|
||||
integrity sha512-oFLl873u4usRM9K63j4ME9u3etNF0PLiJhSQ8rdfuL51Wn3zkD6drf9ZW0dOzjnZI22YYG24z30JcmfCZjMgYg==
|
||||
version "10.7.3"
|
||||
resolved "https://registry.yarnpkg.com/highlight.js/-/highlight.js-10.7.3.tgz#697272e3991356e40c3cac566a74eef681756531"
|
||||
integrity sha512-tzcUFauisWKNHaRkN4Wjl/ZA07gENAjFl3J/c480dprkGTg5EQstgaNFqBfUqCq54kZRIEcreTsAgF/m2quD7A==
|
||||
|
||||
hmac-drbg@^1.0.1:
|
||||
version "1.0.1"
|
||||
@ -13204,9 +13209,9 @@ tslib@^1.10.0, tslib@^1.8.1, tslib@^1.9.0, tslib@^1.9.3:
|
||||
integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==
|
||||
|
||||
tslib@^2.1.0:
|
||||
version "2.2.0"
|
||||
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.2.0.tgz#fb2c475977e35e241311ede2693cee1ec6698f5c"
|
||||
integrity sha512-gS9GVHRU+RGn5KQM2rllAlR3dU6m7AcpJKdtH8gFvQiC4Otgk98XnmMU+nZenHt/+VhnBPWwgrJsyrdcw6i23w==
|
||||
version "2.3.1"
|
||||
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.3.1.tgz#e8a335add5ceae51aa261d32a490158ef042ef01"
|
||||
integrity sha512-77EbyPPpMz+FRFRuAFlWMtmgUWGe9UOG2Z25NqCwiIjRhOf5iKGuzSe5P2w1laq+FkRy4p+PCuVkJSGkzTEKVw==
|
||||
|
||||
tsort@0.0.1:
|
||||
version "0.0.1"
|
||||
@ -13338,15 +13343,15 @@ typeorm-naming-strategies@^2.0.0:
|
||||
integrity sha512-nsJ5jDjhBBEG6olFmxojkO4yrW7hEv38sH7ZXWWx9wnDoo9uaoH/mo2mBYAh/VKgwoFHBLu+CYxGmzXz2GUMcA==
|
||||
|
||||
typeorm@^0.2.32:
|
||||
version "0.2.32"
|
||||
resolved "https://registry.yarnpkg.com/typeorm/-/typeorm-0.2.32.tgz#544dbfdfe0cd0887548d9bcbd28527ea4f4b3c9b"
|
||||
integrity sha512-LOBZKZ9As3f8KRMPCUT2H0JZbZfWfkcUnO3w/1BFAbL/X9+cADTF6bczDGGaKVENJ3P8SaKheKmBgpt5h1x+EQ==
|
||||
version "0.2.37"
|
||||
resolved "https://registry.yarnpkg.com/typeorm/-/typeorm-0.2.37.tgz#1a5e59216077640694d27c04c99ed3f968d15dc8"
|
||||
integrity sha512-7rkW0yCgFC24I5T0f3S/twmLSuccPh1SQmxET/oDWn2sSDVzbyWdnItSdKy27CdJGTlKHYtUVeOcMYw5LRsXVw==
|
||||
dependencies:
|
||||
"@sqltools/formatter" "^1.2.2"
|
||||
app-root-path "^3.0.0"
|
||||
buffer "^6.0.3"
|
||||
chalk "^4.1.0"
|
||||
cli-highlight "^2.1.10"
|
||||
cli-highlight "^2.1.11"
|
||||
debug "^4.3.1"
|
||||
dotenv "^8.2.0"
|
||||
glob "^7.1.6"
|
||||
@ -13357,7 +13362,7 @@ typeorm@^0.2.32:
|
||||
tslib "^2.1.0"
|
||||
xml2js "^0.4.23"
|
||||
yargonaut "^1.1.4"
|
||||
yargs "^16.2.0"
|
||||
yargs "^17.0.1"
|
||||
zen-observable-ts "^1.0.0"
|
||||
|
||||
typescript@^4.3.2:
|
||||
@ -14319,7 +14324,12 @@ yargs-parser@^2.4.1:
|
||||
camelcase "^3.0.0"
|
||||
lodash.assign "^4.0.6"
|
||||
|
||||
yargs-parser@^20.2.2, yargs-parser@^20.2.3:
|
||||
yargs-parser@^20.2.2:
|
||||
version "20.2.9"
|
||||
resolved "https://registry.yarnpkg.com/yargs-parser/-/yargs-parser-20.2.9.tgz#2eb7dc3b0289718fc295f362753845c41a0c94ee"
|
||||
integrity sha512-y11nGElTIV+CT3Zv9t7VKl+Q3hTQoT9a1Qzezhhl6Rp21gJ/IVTW7Z3y9EWXhuUBC2Shnf+DX0antecpAwSP8w==
|
||||
|
||||
yargs-parser@^20.2.3:
|
||||
version "20.2.7"
|
||||
resolved "https://registry.yarnpkg.com/yargs-parser/-/yargs-parser-20.2.7.tgz#61df85c113edfb5a7a4e36eb8aa60ef423cbc90a"
|
||||
integrity sha512-FiNkvbeHzB/syOjIUxFDCnhSfzAL8R5vs40MgLFBorXACCOAEaWu0gRZl14vG8MR9AOJIZbmkjhusqBYZ3HTHw==
|
||||
@ -14437,14 +14447,14 @@ zen-observable-ts@^0.8.21:
|
||||
zen-observable "^0.8.0"
|
||||
|
||||
zen-observable-ts@^1.0.0:
|
||||
version "1.0.0"
|
||||
resolved "https://registry.yarnpkg.com/zen-observable-ts/-/zen-observable-ts-1.0.0.tgz#30d1202b81d8ba4c489e3781e8ca09abf0075e70"
|
||||
integrity sha512-KmWcbz+9kKUeAQ8btY8m1SsEFgBcp7h/Uf3V5quhan7ZWdjGsf0JcGLULQiwOZibbFWnHkYq8Nn2AZbJabovQg==
|
||||
version "1.1.0"
|
||||
resolved "https://registry.yarnpkg.com/zen-observable-ts/-/zen-observable-ts-1.1.0.tgz#2d1aa9d79b87058e9b75698b92791c1838551f83"
|
||||
integrity sha512-1h4zlLSqI2cRLPJUHJFL8bCWHhkpuXkF+dbGkRaWjgDIG26DmzyshUMrdV/rL3UnR+mhaX4fRq8LPouq0MYYIA==
|
||||
dependencies:
|
||||
"@types/zen-observable" "^0.8.2"
|
||||
zen-observable "^0.8.15"
|
||||
"@types/zen-observable" "0.8.3"
|
||||
zen-observable "0.8.15"
|
||||
|
||||
zen-observable@^0.8.0, zen-observable@^0.8.14, zen-observable@^0.8.15:
|
||||
zen-observable@0.8.15, zen-observable@^0.8.0, zen-observable@^0.8.14:
|
||||
version "0.8.15"
|
||||
resolved "https://registry.yarnpkg.com/zen-observable/-/zen-observable-0.8.15.tgz#96415c512d8e3ffd920afd3889604e30b9eaac15"
|
||||
integrity sha512-PQ2PC7R9rslx84ndNBZB/Dkv8V8fZEpk83RLgXtYd0fwUgEjseMn1Dgajh2x6S8QbZAFa9p2qVCEuYZNgve0dQ==
|
||||
|
Loading…
Reference in New Issue
Block a user