mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-03-17 08:19:23 +00:00
Get missing fields in transaction data for subgraph event handlers (#102)
* Get missing fields in event transaction * Cache transaction data for event handlers in subgraph
This commit is contained in:
parent
561c2c9066
commit
9b1aa29afd
@ -42,9 +42,7 @@ export interface GraphData {
|
||||
}
|
||||
|
||||
export interface Context {
|
||||
event: {
|
||||
block?: Block
|
||||
}
|
||||
block?: Block
|
||||
}
|
||||
|
||||
const log = debug('vulcanize:graph-node');
|
||||
@ -71,8 +69,8 @@ export const instantiate = async (
|
||||
const entityName = __getString(entity);
|
||||
const entityId = __getString(id);
|
||||
|
||||
assert(context.event.block);
|
||||
const entityData = await database.getEntity(entityName, entityId, context.event.block.blockHash);
|
||||
assert(context.block);
|
||||
const entityData = await database.getEntity(entityName, entityId, context.block.blockHash);
|
||||
|
||||
if (!entityData) {
|
||||
return null;
|
||||
@ -91,8 +89,8 @@ export const instantiate = async (
|
||||
|
||||
const entityInstance = await Entity.wrap(data);
|
||||
|
||||
assert(context.event.block);
|
||||
let dbData = await database.fromGraphEntity(instanceExports, context.event.block, entityName, entityInstance);
|
||||
assert(context.block);
|
||||
let dbData = await database.fromGraphEntity(instanceExports, context.block, entityName, entityInstance);
|
||||
await database.saveEntity(entityName, dbData);
|
||||
|
||||
// Resolve any field name conflicts in the dbData for auto-diff.
|
||||
@ -108,7 +106,7 @@ export const instantiate = async (
|
||||
// Create an auto-diff.
|
||||
assert(indexer.createDiffStaged);
|
||||
assert(dataSource?.address);
|
||||
await indexer.createDiffStaged(dataSource.address, context.event.block.blockHash, diffData);
|
||||
await indexer.createDiffStaged(dataSource.address, context.block.blockHash, diffData);
|
||||
},
|
||||
|
||||
'log.log': (level: number, msg: number) => {
|
||||
@ -161,10 +159,10 @@ export const instantiate = async (
|
||||
|
||||
functionParams = await Promise.all(functionParamsPromise);
|
||||
|
||||
assert(context.event.block);
|
||||
assert(context.block);
|
||||
|
||||
// TODO: Check for function overloading.
|
||||
let result = await contract[functionName](...functionParams, { blockTag: context.event.block.blockHash });
|
||||
let result = await contract[functionName](...functionParams, { blockTag: context.block.blockHash });
|
||||
|
||||
// Using function signature does not work.
|
||||
const { outputs } = contract.interface.getFunction(functionName);
|
||||
|
@ -26,14 +26,19 @@ export const DECIMAL128_PMIN = '1e-6143';
|
||||
// Maximum -ve decimal value.
|
||||
export const DECIMAL128_NMAX = '-1e-6143';
|
||||
|
||||
interface Transaction {
|
||||
export interface Transaction {
|
||||
hash: string;
|
||||
index: number;
|
||||
from: string;
|
||||
to: string;
|
||||
value: string;
|
||||
gasLimit: string;
|
||||
gasPrice: string;
|
||||
input: string;
|
||||
}
|
||||
|
||||
export interface Block {
|
||||
headerId: number;
|
||||
blockHash: string;
|
||||
blockNumber: string;
|
||||
timestamp: string;
|
||||
@ -231,16 +236,19 @@ export const createEvent = async (instanceExports: any, contractAddress: string,
|
||||
const txToStringPtr = await __newString(tx.to);
|
||||
const txTo = tx.to && await Address.fromString(txToStringPtr);
|
||||
|
||||
const txValuePtr = await BigInt.fromI32(0);
|
||||
const txGasLimitPtr = await BigInt.fromI32(0);
|
||||
const txGasPricePtr = await BigInt.fromI32(0);
|
||||
const txinputPtr = await Bytes.empty();
|
||||
const valueStringPtr = await __newString(tx.value);
|
||||
const txValuePtr = await BigInt.fromString(valueStringPtr);
|
||||
|
||||
const gasLimitStringPtr = await __newString(tx.gasLimit);
|
||||
const txGasLimitPtr = await BigInt.fromString(gasLimitStringPtr);
|
||||
|
||||
const gasPriceStringPtr = await __newString(tx.gasPrice);
|
||||
const txGasPricePtr = await BigInt.fromString(gasPriceStringPtr);
|
||||
|
||||
const inputStringPtr = await __newString(tx.input);
|
||||
const txInputByteArray = await ByteArray.fromHexString(inputStringPtr);
|
||||
const txInputPtr = await Bytes.fromByteArray(txInputByteArray);
|
||||
|
||||
// Missing fields from watcher in transaction data:
|
||||
// value
|
||||
// gasLimit
|
||||
// gasPrice
|
||||
// input
|
||||
const transaction = await ethereum.Transaction.__new(
|
||||
txHash,
|
||||
txIndex,
|
||||
@ -249,7 +257,7 @@ export const createEvent = async (instanceExports: any, contractAddress: string,
|
||||
txValuePtr,
|
||||
txGasLimitPtr,
|
||||
txGasPricePtr,
|
||||
txinputPtr
|
||||
txInputPtr
|
||||
);
|
||||
|
||||
const eventParamArrayPromise = inputs.map(async input => {
|
||||
|
@ -11,9 +11,9 @@ import { ContractInterface, utils, providers } from 'ethers';
|
||||
|
||||
import { ResultObject } from '@vulcanize/assemblyscript/lib/loader';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import { IndexerInterface, getFullBlock, BlockHeight, ServerConfig } from '@vulcanize/util';
|
||||
import { IndexerInterface, getFullBlock, BlockHeight, ServerConfig, getFullTransaction } from '@vulcanize/util';
|
||||
|
||||
import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts } from './utils';
|
||||
import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts, Transaction } from './utils';
|
||||
import { Context, GraphData, instantiate } from './loader';
|
||||
import { Database } from './database';
|
||||
|
||||
@ -34,10 +34,9 @@ export class GraphWatcher {
|
||||
_wasmRestartBlocksInterval: number;
|
||||
_dataSources: any[] = [];
|
||||
_dataSourceMap: { [key: string]: DataSource } = {};
|
||||
_transactionsMap: Map<string, Transaction> = new Map()
|
||||
|
||||
_context: Context = {
|
||||
event: {}
|
||||
}
|
||||
_context: Context = {}
|
||||
|
||||
constructor (database: Database, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, serverConfig: ServerConfig) {
|
||||
this._database = database;
|
||||
@ -119,12 +118,14 @@ export class GraphWatcher {
|
||||
}
|
||||
|
||||
async handleEvent (eventData: any) {
|
||||
const { contract, event, eventSignature, block, tx, eventIndex } = eventData;
|
||||
const { contract, event, eventSignature, block, tx: { hash: txHash }, eventIndex } = eventData;
|
||||
|
||||
// TODO: Use blockData fetched in handleBlock.
|
||||
const blockData = await getFullBlock(this._postgraphileClient, this._ethProvider, block.hash);
|
||||
if (!this._context.block) {
|
||||
this._context.block = await getFullBlock(this._postgraphileClient, this._ethProvider, block.hash);
|
||||
}
|
||||
|
||||
this._context.event.block = blockData;
|
||||
const blockData = this._context.block;
|
||||
assert(blockData);
|
||||
|
||||
// Get dataSource in subgraph yaml based on contract address.
|
||||
const dataSource = this._dataSources.find(dataSource => dataSource.source.address === contract);
|
||||
@ -157,6 +158,8 @@ export class GraphWatcher {
|
||||
|
||||
const eventFragment = contractInterface.getEvent(eventSignature);
|
||||
|
||||
const tx = await this._getTransactionData(blockData.headerId, txHash);
|
||||
|
||||
const data = {
|
||||
block: blockData,
|
||||
inputs: eventFragment.inputs,
|
||||
@ -174,7 +177,10 @@ export class GraphWatcher {
|
||||
async handleBlock (blockHash: string) {
|
||||
const blockData = await getFullBlock(this._postgraphileClient, this._ethProvider, blockHash);
|
||||
|
||||
this._context.event.block = blockData;
|
||||
this._context.block = blockData;
|
||||
|
||||
// Clear transactions map on handling new block.
|
||||
this._transactionsMap.clear();
|
||||
|
||||
// Call block handler(s) for each contract.
|
||||
for (const dataSource of this._dataSources) {
|
||||
@ -263,4 +269,18 @@ export class GraphWatcher {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async _getTransactionData (headerId: number, txHash: string): Promise<Transaction> {
|
||||
let transaction = this._transactionsMap.get(txHash);
|
||||
|
||||
if (transaction) {
|
||||
return transaction;
|
||||
}
|
||||
|
||||
transaction = await getFullTransaction(this._postgraphileClient, headerId, txHash);
|
||||
assert(transaction);
|
||||
this._transactionsMap.set(txHash, transaction);
|
||||
|
||||
return transaction;
|
||||
}
|
||||
}
|
||||
|
@ -92,6 +92,16 @@ export class EthClient {
|
||||
);
|
||||
}
|
||||
|
||||
async getFullTransaction ({ headerId, txHash }: { headerId: number, txHash: string }): Promise<any> {
|
||||
return this._graphqlClient.query(
|
||||
ethQueries.getFullTransaction,
|
||||
{
|
||||
headerId,
|
||||
txHash
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
async getBlockByHash (blockHash?: string): Promise<any> {
|
||||
const { block } = await this._graphqlClient.query(ethQueries.getBlockByHash, { blockHash });
|
||||
block.number = parseInt(block.number, 16);
|
||||
|
@ -86,6 +86,7 @@ export const getFullBlocks = gql`
|
||||
query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) {
|
||||
allEthHeaderCids(condition: { blockNumber: $blockNumber, blockHash: $blockHash }) {
|
||||
nodes {
|
||||
id
|
||||
cid
|
||||
blockNumber
|
||||
blockHash
|
||||
@ -106,6 +107,21 @@ query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) {
|
||||
}
|
||||
`;
|
||||
|
||||
export const getFullTransaction = gql`
|
||||
query ethTransactionCidByHeaderIdAndTxHash($headerId: Int!, $txHash: String!) {
|
||||
ethTransactionCidByHeaderIdAndTxHash(headerId: $headerId, txHash: $txHash) {
|
||||
cid
|
||||
txHash
|
||||
index
|
||||
src
|
||||
dst
|
||||
blockByMhKey {
|
||||
data
|
||||
}
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
export const getBlockByHash = gql`
|
||||
query block($blockHash: Bytes32) {
|
||||
block(hash: $blockHash) {
|
||||
@ -158,6 +174,7 @@ export default {
|
||||
getBlockWithTransactions,
|
||||
getBlocks,
|
||||
getFullBlocks,
|
||||
getFullTransaction,
|
||||
getBlockByHash,
|
||||
subscribeBlocks,
|
||||
subscribeTransactions
|
||||
|
@ -55,3 +55,19 @@ export function decodeHeader (rlp : Uint8Array): any {
|
||||
export function decodeData (hexLiteral: string): Uint8Array {
|
||||
return Uint8Array.from(Buffer.from(hexLiteral.slice(2), 'hex'));
|
||||
}
|
||||
|
||||
export function decodeTransaction (rlp : Uint8Array): any {
|
||||
try {
|
||||
const data = utils.RLP.decode(rlp);
|
||||
|
||||
return {
|
||||
GasPrice: decodeInteger(data[1], BigInt(0)),
|
||||
GasLimit: decodeInteger(data[2], BigInt(0)),
|
||||
Amount: decodeInteger(data[4], BigInt(0)),
|
||||
Data: data[5]
|
||||
};
|
||||
} catch (error: any) {
|
||||
log(error);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
@ -225,15 +225,15 @@ export class JobRunner {
|
||||
blockProgress = await this._indexer.fetchBlockEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
|
||||
}
|
||||
|
||||
if (this._indexer.processBlock) {
|
||||
await this._indexer.processBlock(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
// Check if block has unprocessed events.
|
||||
if (blockProgress.numProcessedEvents < blockProgress.numEvents) {
|
||||
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true });
|
||||
}
|
||||
|
||||
if (this._indexer.processBlock) {
|
||||
await this._indexer.processBlock(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime();
|
||||
log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`);
|
||||
}
|
||||
|
@ -194,6 +194,7 @@ export const getFullBlock = async (ethClient: EthClient, ethProvider: providers.
|
||||
const { size } = await provider.send('eth_getBlockByHash', [blockHash, false]);
|
||||
|
||||
return {
|
||||
headerId: fullBlock.id,
|
||||
cid: fullBlock.cid,
|
||||
blockNumber: fullBlock.blockNumber,
|
||||
blockHash: fullBlock.blockHash,
|
||||
@ -211,3 +212,26 @@ export const getFullBlock = async (ethClient: EthClient, ethProvider: providers.
|
||||
size: BigInt(size).toString()
|
||||
};
|
||||
};
|
||||
|
||||
export const getFullTransaction = async (ethClient: EthClient, headerId: number, txHash: string): Promise<any> => {
|
||||
const {
|
||||
ethTransactionCidByHeaderIdAndTxHash: fullTx
|
||||
} = await ethClient.getFullTransaction({ headerId, txHash });
|
||||
|
||||
assert(fullTx.blockByMhKey);
|
||||
|
||||
// Decode the transaction data.
|
||||
const extraData = EthDecoder.decodeTransaction(EthDecoder.decodeData(fullTx.blockByMhKey.data));
|
||||
assert(extraData);
|
||||
|
||||
return {
|
||||
hash: txHash,
|
||||
from: fullTx.src,
|
||||
to: fullTx.dst,
|
||||
index: fullTx.index,
|
||||
value: extraData.Amount.toString(),
|
||||
gasLimit: extraData.GasLimit.toString(),
|
||||
gasPrice: extraData.GasPrice.toString(),
|
||||
input: extraData.Data
|
||||
};
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user