Update eden subgraph build and watch subgraph contracts (#54)

* Update eden subgraph build

* Watch subgraph contracts in the job-runner

* Change eden subgraph startBlocks to original values
This commit is contained in:
prathamesh0 2021-11-18 16:16:37 +05:30 committed by nabarun
parent d76268e506
commit af259a32f0
9 changed files with 49 additions and 20 deletions

View File

@ -261,8 +261,10 @@ export class Indexer implements IndexerInterface {
const checkpoint = await this.getLatestIPLDBlock(contractAddress, 'checkpoint'); const checkpoint = await this.getLatestIPLDBlock(contractAddress, 'checkpoint');
// There should be an initial checkpoint at least. // There should be an initial checkpoint at least.
// Assumption: There should be no events for the contract at the starting block. // Return if initial checkpoint doesn't exist.
assert(checkpoint, 'Initial checkpoint doesn\'t exist'); if (!checkpoint) {
return;
}
// Check if the latest checkpoint is in the same block. // Check if the latest checkpoint is in the same block.
assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash.'); assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash.');
@ -289,7 +291,7 @@ export class Indexer implements IndexerInterface {
const checkpointBlock = await this.getLatestIPLDBlock(contract.address, 'checkpoint'); const checkpointBlock = await this.getLatestIPLDBlock(contract.address, 'checkpoint');
if (!checkpointBlock) { if (!checkpointBlock) {
if (blockNumber === contract.startingBlock) { if (blockNumber >= contract.startingBlock) {
// Call initial checkpoint hook. // Call initial checkpoint hook.
await createInitialCheckpoint(this, contract.address, blockHash); await createInitialCheckpoint(this, contract.address, blockHash);
} }

View File

@ -146,6 +146,9 @@ export const main = async (): Promise<any> => {
graphWatcher.setIndexer(indexer); graphWatcher.setIndexer(indexer);
await graphWatcher.init(); await graphWatcher.init();
// Watching all the contracts in the subgraph.
await graphWatcher.addContracts();
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -42,27 +42,35 @@ export class Database {
async getEntity<Entity> (entity: (new () => Entity) | string, id: string, blockHash: string): Promise<Entity | undefined> { async getEntity<Entity> (entity: (new () => Entity) | string, id: string, blockHash: string): Promise<Entity | undefined> {
const queryRunner = this._conn.createQueryRunner(); const queryRunner = this._conn.createQueryRunner();
const repo = queryRunner.manager.getRepository(entity);
const whereOptions: { [key: string]: any } = { id };
if (blockHash) { try {
whereOptions.blockHash = blockHash; const repo = queryRunner.manager.getRepository(entity);
}
const findOptions = { const whereOptions: { [key: string]: any } = { id };
where: whereOptions,
order: { if (blockHash) {
blockNumber: 'DESC' whereOptions.blockHash = blockHash;
} }
};
let entityData = await repo.findOne(findOptions as FindOneOptions<any>); const findOptions = {
where: whereOptions,
order: {
blockNumber: 'DESC'
}
};
if (!entityData && findOptions.where.blockHash) { let entityData = await repo.findOne(findOptions as FindOneOptions<any>);
entityData = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
if (!entityData && findOptions.where.blockHash) {
entityData = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions);
}
return entityData;
} catch (error) {
console.log(error);
} finally {
await queryRunner.release();
} }
return entityData;
} }
async saveEntity (entity: string, data: any): Promise<void> { async saveEntity (entity: string, data: any): Promise<void> {
@ -129,7 +137,7 @@ export class Database {
// Get blockNumber as _blockNumber and blockHash as _blockHash from the entityInstance (wasm). // Get blockNumber as _blockNumber and blockHash as _blockHash from the entityInstance (wasm).
if (['_blockNumber', '_blockHash'].includes(propertyName)) { if (['_blockNumber', '_blockHash'].includes(propertyName)) {
return fromEntityValue(instanceExports, entityInstance, propertyName); return fromEntityValue(instanceExports, entityInstance, propertyName.slice(1));
} }
return fromEntityValue(instanceExports, entityInstance, propertyName); return fromEntityValue(instanceExports, entityInstance, propertyName);

View File

@ -303,6 +303,9 @@ export const createBlock = async (instanceExports: any, blockData: Block): Promi
const authorPtr = await Address.zero(); const authorPtr = await Address.zero();
const sizePtr = await __newString('0');
const size = await BigInt.fromString(sizePtr);
// Missing fields from watcher in block data: // Missing fields from watcher in block data:
// author // author
// size // size
@ -320,7 +323,7 @@ export const createBlock = async (instanceExports: any, blockData: Block): Promi
blockTimestamp, blockTimestamp,
difficulty, difficulty,
totalDifficulty, totalDifficulty,
null size
); );
}; };

View File

@ -95,6 +95,16 @@ export class GraphWatcher {
}, {}); }, {});
} }
async addContracts () {
assert(this._indexer?.watchContract);
// Watching the contract(s).
for (const dataSource of this._dataSources) {
const { source: { address, startBlock }, name } = dataSource;
await this._indexer.watchContract(address, name, true, startBlock);
}
}
async handleEvent (eventData: any) { async handleEvent (eventData: any) {
const { contract, event, eventSignature, block, tx, eventIndex } = eventData; const { contract, event, eventSignature, block, tx, eventIndex } = eventData;
@ -147,6 +157,8 @@ export class GraphWatcher {
async handleBlock (blockHash: string) { async handleBlock (blockHash: string) {
const blockData = await getFullBlock(this._postgraphileClient, blockHash); const blockData = await getFullBlock(this._postgraphileClient, blockHash);
this._context.event.block = blockData;
// Call block handler(s) for each contract. // Call block handler(s) for each contract.
for (const dataSource of this._dataSources) { for (const dataSource of this._dataSources) {
// Check if block handler(s) are configured. // Check if block handler(s) are configured.

View File

@ -73,6 +73,7 @@ export interface IndexerInterface {
isWatchedContract?: (address: string) => Promise<ContractInterface | undefined>; isWatchedContract?: (address: string) => Promise<ContractInterface | undefined>;
cacheContract?: (contract: ContractInterface) => void; cacheContract?: (contract: ContractInterface) => void;
createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void> createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void>
watchContract?: (address: string, kind: string, checkpoint: boolean, startingBlock?: number) => Promise<boolean>
} }
export interface EventWatcherInterface { export interface EventWatcherInterface {