Push address events to downstream subscribers (#85)

* Push address event to downstream subscribers.

* Get addresses from trace - tests and fixes.
This commit is contained in:
Ashwin Phatak 2021-06-22 14:04:48 +05:30 committed by GitHub
parent 816352c9ff
commit 2adc5e9c34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 275 additions and 37 deletions

View File

@ -47,9 +47,11 @@
},
"devDependencies": {
"@ethersproject/abi": "^5.3.0",
"@types/chai": "^4.2.19",
"@types/express": "^4.17.11",
"@types/fs-extra": "^9.0.11",
"@types/json-bigint": "^1.0.0",
"@types/mocha": "^8.2.2",
"@types/yargs": "^17.0.0",
"@typescript-eslint/eslint-plugin": "^4.25.0",
"@typescript-eslint/parser": "^4.25.0",

View File

@ -60,16 +60,24 @@ export class Database {
}
async getTrace (txHash: string): Promise<Trace | undefined> {
return this._conn.getRepository(Trace)
.createQueryBuilder('trace')
.where('tx_hash = :txHash', { txHash })
.getOne();
const repo = this._conn.getRepository(Trace);
return repo.findOne({ where: { txHash } });
}
async saveTrace ({ txHash, blockNumber, blockHash, trace }: DeepPartial<Trace>): Promise<Trace> {
const repo = this._conn.getRepository(Trace);
const entity = repo.create({ txHash, blockNumber, blockHash, trace });
return repo.save(entity);
async saveTrace ({ txHash, blockNumber, blockHash, trace }: DeepPartial<Trace>): Promise<void> {
await this._conn.transaction(async (tx) => {
const repo = tx.getRepository(Trace);
const numRows = await repo
.createQueryBuilder()
.where('tx_hash = :txHash', { txHash })
.getCount();
if (numRows === 0) {
const entity = repo.create({ txHash, blockNumber, blockHash, trace });
await repo.save(entity);
}
});
}
async saveTraceEntity (trace: Trace): Promise<Trace> {

View File

@ -18,6 +18,6 @@ export class Trace {
@Column('text')
trace!: string;
@ManyToMany(() => Account, account => account.appearances, { cascade: ['insert'] })
@ManyToMany(() => Account, account => account.appearances, { eager: true, cascade: ['insert'] })
accounts: Account[]
}

View File

@ -2,29 +2,19 @@ import assert from 'assert';
import debug from 'debug';
import { ethers } from 'ethers';
import { PubSub } from 'apollo-server-express';
import _ from 'lodash';
import { EthClient } from '@vulcanize/ipld-eth-client';
import { GetStorageAt } from '@vulcanize/solidity-mapper';
import { TracingClient } from '@vulcanize/tracing-client';
import { addressesInTrace } from './util';
import { Database } from './database';
import { Trace } from './entity/Trace';
import { Account } from './entity/Account';
const log = debug('vulcanize:indexer');
const addressesIn = (obj: any): any => {
if (!obj) {
return [];
}
if (_.isArray(obj)) {
return _.map(obj, addressesIn);
}
return [obj.from, obj.to, ...addressesIn(obj.calls)];
};
const AddressEvent = 'address_event';
export class Indexer {
_db: Database
@ -46,8 +36,8 @@ export class Indexer {
this._tracingClient = tracingClient;
}
getEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator(['event']);
getAddressEventIterator (): AsyncIterator<any> {
return this._pubsub.asyncIterator([AddressEvent]);
}
async isWatchedAddress (address : string): Promise<boolean> {
@ -63,7 +53,39 @@ export class Indexer {
return true;
}
async traceTxAndIndexAppearances (txHash: string): Promise<any> {
async getTrace (txHash: string): Promise<Trace | undefined> {
return this._db.getTrace(txHash);
}
async publishAddressEventToSubscribers (txHash: string): Promise<void> {
const traceObj = await this._db.getTrace(txHash);
if (!traceObj) {
return;
}
const { blockNumber, blockHash, trace } = traceObj;
for (let i = 0; i < traceObj.accounts.length; i++) {
const account = traceObj.accounts[i];
log(`pushing tx ${txHash} event to GQL subscribers for address ${account.address}`);
// Publishing the event here will result in pushing the payload to GQL subscribers for `onAddressEvent(address)`.
await this._pubsub.publish(AddressEvent, {
onAddressEvent: {
address: account.address,
txTrace: {
txHash,
blockHash,
blockNumber,
trace
}
}
});
}
}
async traceTxAndIndexAppearances (txHash: string): Promise<Trace> {
let entity = await this._db.getTrace(txHash);
if (entity) {
log('traceTx: db hit');
@ -73,22 +95,20 @@ export class Indexer {
const tx = await this._tracingClient.getTx(txHash);
const trace = await this._tracingClient.getTxTrace(txHash, 'callTraceWithAddresses', '15s');
entity = await this._db.saveTrace({
await this._db.saveTrace({
txHash,
blockNumber: tx.blockNumber,
blockHash: tx.blockHash,
trace: JSON.stringify(trace)
});
entity = await this._db.getTrace(txHash);
assert(entity);
await this.indexAppearances(entity);
}
return {
txHash,
blockNumber: entity.blockNumber,
blockHash: entity.blockHash,
trace: entity.trace
};
return entity;
}
async getAppearances (address: string, fromBlockNumber: number, toBlockNumber: number): Promise<Trace[]> {
@ -97,9 +117,11 @@ export class Indexer {
async indexAppearances (trace: Trace): Promise<Trace> {
const traceObj = JSON.parse(trace.trace);
const addresses = _.uniq(_.compact(_.flattenDeep(addressesIn(traceObj)))).sort();
trace.accounts = _.map(addresses, address => {
// TODO: Check if tx has failed?
const addresses = addressesInTrace(traceObj);
trace.accounts = addresses.map((address: string) => {
assert(address);
const account = new Account();

View File

@ -1,5 +1,7 @@
import assert from 'assert';
import debug from 'debug';
import { withFilter } from 'apollo-server-express';
import { ethers } from 'ethers';
import { Indexer } from './indexer';
@ -22,12 +24,19 @@ export const createResolvers = async (indexer: Indexer): Promise<any> => {
return {
Subscription: {
onAddressEvent: {
subscribe: () => indexer.getEventIterator()
subscribe: withFilter(
() => indexer.getAddressEventIterator(),
(payload: any, variables: any) => {
return payload.onAddressEvent.address === ethers.utils.getAddress(variables.address);
}
)
}
},
Mutation: {
watchAddress: (_: any, { address, startingBlock = 1 }: WatchAddressParams): Promise<boolean> => {
address = ethers.utils.getAddress(address);
log('watchAddress', address, startingBlock);
return indexer.watchAddress(address, startingBlock);
}
@ -35,13 +44,23 @@ export const createResolvers = async (indexer: Indexer): Promise<any> => {
Query: {
appearances: async (_: any, { address, fromBlockNumber, toBlockNumber }: AppearanceParams): Promise<any> => {
address = ethers.utils.getAddress(address);
log('appearances', address, fromBlockNumber, toBlockNumber);
return indexer.getAppearances(address, fromBlockNumber, toBlockNumber);
},
traceTx: async (_: any, { txHash }: { txHash: string }): Promise<any> => {
log('traceTx', txHash);
return indexer.traceTxAndIndexAppearances(txHash);
const { blockHash, blockNumber, trace } = await indexer.traceTxAndIndexAppearances(txHash);
return {
txHash,
blockNumber,
blockHash,
trace
};
}
}
};

View File

@ -46,8 +46,8 @@ type Query {
#
type Subscription {
# Watch for token events (at head of chain).
onAddressEvent: WatchedAddressEvent!
# Watch for address events (at head of chain).
onAddressEvent(address: String!): WatchedAddressEvent!
}
#

View File

@ -29,7 +29,9 @@ export class TxWatcher {
this._subscription = await this._ethClient.watchTransactions(async (value) => {
const { txHash, ethHeaderCidByHeaderId: { blockHash, blockNumber } } = _.get(value, 'data.listen.relatedNode');
log('watchTransaction', JSON.stringify({ txHash, blockHash, blockNumber }, null, 2));
await this._indexer.traceTxAndIndexAppearances(txHash);
await this._indexer.publishAddressEventToSubscribers(txHash);
});
}

View File

@ -0,0 +1,146 @@
import { describe, it } from 'mocha';
import { expect } from 'chai';
import { addressesInTrace } from './util';
describe('addressInTrace', () => {
it('should parse an empty trace', () => {
const addresses = addressesInTrace({});
expect(addresses).to.eql([]);
});
it('should parse an unnested trace', () => {
const addresses = addressesInTrace({
from: '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc',
to: '0xCA6D29232D1435D8198E3E5302495417dD073d61'
});
expect(addresses).to.eql([
'0xCA6D29232D1435D8198E3E5302495417dD073d61',
'0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc'
]);
});
it('should parse an unnested trace with an addresses field', () => {
const addresses = addressesInTrace({
from: '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc',
to: '0xCA6D29232D1435D8198E3E5302495417dD073d61',
addresses: {
'0x9273D9437B0bf2F1b7999d8dB72960d6379564d1': {},
'0xd86fB467B78901310e9967A2C8B601A5E794c12C': {}
}
});
expect(addresses).to.eql([
'0x9273D9437B0bf2F1b7999d8dB72960d6379564d1',
'0xCA6D29232D1435D8198E3E5302495417dD073d61',
'0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc',
'0xd86fB467B78901310e9967A2C8B601A5E794c12C'
]);
});
it('should parse a nested trace', () => {
const addresses = addressesInTrace({
from: '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc',
to: '0xCA6D29232D1435D8198E3E5302495417dD073d61',
calls: [{
from: '0x9273D9437B0bf2F1b7999d8dB72960d6379564d1',
to: '0xd86fB467B78901310e9967A2C8B601A5E794c12C'
},
{
from: '0xf29340ca4ad7A797dF2d67Be58d354EC284AE62f',
to: '0xEcFF6b14D3ed9569108b413f846279E64E39BC92'
}]
});
expect(addresses).to.eql([
'0x9273D9437B0bf2F1b7999d8dB72960d6379564d1',
'0xCA6D29232D1435D8198E3E5302495417dD073d61',
'0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc',
'0xEcFF6b14D3ed9569108b413f846279E64E39BC92',
'0xd86fB467B78901310e9967A2C8B601A5E794c12C',
'0xf29340ca4ad7A797dF2d67Be58d354EC284AE62f'
]);
});
it('should parse a nested trace with an addresses field', () => {
const addresses = addressesInTrace({
from: '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc',
to: '0xCA6D29232D1435D8198E3E5302495417dD073d61',
calls: [{
from: '0x9273D9437B0bf2F1b7999d8dB72960d6379564d1',
to: '0xd86fB467B78901310e9967A2C8B601A5E794c12C',
addresses: {
'0xf29340ca4ad7A797dF2d67Be58d354EC284AE62f': {},
'0xEcFF6b14D3ed9569108b413f846279E64E39BC92': {}
}
}]
});
expect(addresses).to.eql([
'0x9273D9437B0bf2F1b7999d8dB72960d6379564d1',
'0xCA6D29232D1435D8198E3E5302495417dD073d61',
'0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc',
'0xEcFF6b14D3ed9569108b413f846279E64E39BC92',
'0xd86fB467B78901310e9967A2C8B601A5E794c12C',
'0xf29340ca4ad7A797dF2d67Be58d354EC284AE62f'
]);
});
it('should not return duplicate addresses', () => {
const addresses = addressesInTrace({
from: '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc',
to: '0xCA6D29232D1435D8198E3E5302495417dD073d61',
calls: [{
from: '0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc',
to: '0xCA6D29232D1435D8198E3E5302495417dD073d61',
addresses: {
'0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc': {},
'0xCA6D29232D1435D8198E3E5302495417dD073d61': {}
}
}]
});
expect(addresses).to.eql([
'0xCA6D29232D1435D8198E3E5302495417dD073d61',
'0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc'
]);
});
it('should return correct addresses for an ERC20 transfer', () => {
/* eslint-disable */
const trace = {
"type": "CALL",
"from": "0xdc7d7a8920c8eecc098da5b7522a5f31509b5bfc",
"to": "0x1ca7c995f8ef0a2989bbce08d5b7efe50a584aa1",
"value": "0x0",
"gas": "0x4edf",
"gasUsed": "0x3982",
"input": "0xa9059cbb000000000000000000000000ca6d29232d1435d8198e3e5302495417dd073d610000000000000000000000000000000000000000000000000de0b6b3a7640000",
"output": "0x0000000000000000000000000000000000000000000000000000000000000001",
"time": "66.609994ms",
"addresses": {
"0xca6d29232d1435d8198e3e5302495417dd073d61": {
"confidence": 1,
"opcodes": [
"CALLDATALOAD", "AND", "SWAP1", "DUP5", "DUP3", "AND", "DUP4", "POP", "DUP6", "AND", "AND", "DUP5", "AND", "AND", "DUP2", "AND", "POP", "SWAP2"
]
},
"0xdc7d7a8920c8eecc098da5b7522a5f31509b5bfc": {
"confidence": 1,
"opcodes": [
"CALLER", "POP", "JUMP", "JUMPDEST", "DUP4", "AND", "DUP4", "POP", "DUP8", "AND", "AND", "DUP6", "AND", "AND", "DUP4", "AND", "POP"
]
}
}
};
/* eslint-enable */
const addresses = addressesInTrace(trace);
expect(addresses).to.eql([
'0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1',
'0xCA6D29232D1435D8198E3E5302495417dD073d61',
'0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc'
]);
});
});

View File

@ -0,0 +1,30 @@
import _ from 'lodash';
import { ethers } from 'ethers';
export const addressesInTrace = (obj: any): any => {
return _.uniq(_.compact(_.flattenDeep(addressesIn(obj))))
.sort()
.map(address => ethers.utils.getAddress(<string>address));
};
const addressesIn = (obj: any): any => {
const addresses: any = [];
if (obj) {
addresses.push(obj.from);
addresses.push(obj.to);
if (obj.addresses) {
addresses.push(_.keys(obj.addresses));
}
if (obj.calls) {
obj.calls.forEach((call: any) => {
addresses.push(addressesIn(call));
});
}
}
return addresses;
};

View File

@ -314,6 +314,10 @@
if (result.error !== undefined && (result.error !== "execution reverted" || result.output ==="0x")) {
delete result.output;
}
if (this.callstack[0].addresses !== undefined) {
result.addresses = this.callstack[0].addresses;
}
return this.finalize(result);
},

View File

@ -2155,6 +2155,11 @@
resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.2.18.tgz#0c8e298dbff8205e2266606c1ea5fbdba29b46e4"
integrity sha512-rS27+EkB/RE1Iz3u0XtVL5q36MGDWbgYe7zWiodyKNUnthxY0rukK5V36eiUCtCisB7NN8zKYH6DO2M37qxFEQ==
"@types/chai@^4.2.19":
version "4.2.19"
resolved "https://registry.yarnpkg.com/@types/chai/-/chai-4.2.19.tgz#80f286b515897413c7a35bdda069cc80f2344233"
integrity sha512-jRJgpRBuY+7izT7/WNXP/LsMO9YonsstuL+xuvycDyESpoDoIAsMd7suwpB4h9oEWB+ZlPTqJJ8EHomzNhwTPQ==
"@types/connect@*":
version "3.4.34"
resolved "https://registry.yarnpkg.com/@types/connect/-/connect-3.4.34.tgz#170a40223a6d666006d93ca128af2beb1d9b1901"