Sqlite based index for ERC20 data (#31)

* Save/load balance and allowance data to/from RDBMS.

* Event entities.

* Pass artifacts to indexer.

* Refactoring, comments.

* Sync events to database.

* Absorb typeorm config file settings into our own config file.
This commit is contained in:
Ashwin Phatak 2021-06-03 17:31:21 +05:30 committed by GitHub
parent 23f9a9db41
commit 47ce52c112
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 438 additions and 93 deletions

View File

@ -51,22 +51,44 @@ yarn test
```text
{
balanceOf(blockHash: "0x671e693ec3dccc948606db8d7e65ac3e16baea80a0dd6d56a126e07ccf85231f", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1", owner: "0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc") {
name(blockHash: "0x5ef95c9847f15179b64fa57994355623f899aca097ad779421b8dff866a8b9c3", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1") {
value
proof {
data
}
}
allowance(blockHash: "0x671e693ec3dccc948606db8d7e65ac3e16baea80a0dd6d56a126e07ccf85231f", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1", owner: "0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc", spender: "0xCA6D29232D1435D8198E3E5302495417dD073d61") {
symbol(blockHash: "0x5ef95c9847f15179b64fa57994355623f899aca097ad779421b8dff866a8b9c3", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1") {
value
proof {
data
}
}
events(blockHash: "0x671e693ec3dccc948606db8d7e65ac3e16baea80a0dd6d56a126e07ccf85231f", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1") {
totalSupply(blockHash: "0x5ef95c9847f15179b64fa57994355623f899aca097ad779421b8dff866a8b9c3", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1") {
value
proof {
data
}
}
balanceOf(blockHash: "0x5ef95c9847f15179b64fa57994355623f899aca097ad779421b8dff866a8b9c3", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1", owner: "0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc") {
value
proof {
data
}
}
allowance(blockHash: "0x81ed2b04af35b1b276281c37243212731202d5a191a27d07b22a605fd442998d", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1", owner: "0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc", spender: "0xCA6D29232D1435D8198E3E5302495417dD073d61") {
value
proof {
data
}
}
events(blockHash: "0x3441ba476dff95c58528afe754ceec659e0ef8ff1b59244ec4545f4f9784a51c", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1") {
event {
__typename
... on TransferEvent {
from
to
@ -83,4 +105,5 @@ yarn test
}
}
}
```

View File

@ -2,6 +2,11 @@
host = "127.0.0.1"
port = 3001
[database]
type = "sqlite"
database = "out/indexer.db"
synchronize = true
logging = false
[upstream]
gqlEndpoint = "http://127.0.0.1:8083/graphql"

View File

@ -1,20 +0,0 @@
{
"type": "sqlite",
"database": "out/indexer.db",
"synchronize": true,
"logging": false,
"entities": [
"src/entity/**/*.ts"
],
"migrations": [
"src/migration/**/*.ts"
],
"subscribers": [
"src/subscriber/**/*.ts"
],
"cli": {
"entitiesDir": "src/entity",
"migrationsDir": "src/migration",
"subscribersDir": "src/subscriber"
}
}

View File

@ -0,0 +1,114 @@
import assert from "assert";
import { Connection, createConnection } from "typeorm";
import { Allowance } from "./entity/Allowance";
import { Balance } from "./entity/Balance";
import { Event } from "./entity/Event";
import { EventSyncProgress } from "./entity/EventProgress";
export class Database {
_config: any
_conn: Connection
constructor(config) {
assert(config);
this._config = config;
}
async init() {
assert(!this._conn);
this._conn = await createConnection(this._config);
}
async getBalance({ blockHash, token, owner }) {
return this._conn.getRepository(Balance)
.createQueryBuilder("balance")
.where("blockHash = :blockHash AND token = :token AND owner = :owner", {
blockHash,
token,
owner
})
.getOne();
}
async getAllowance({ blockHash, token, owner, spender }) {
return this._conn.getRepository(Allowance)
.createQueryBuilder("allowance")
.where("blockHash = :blockHash AND token = :token AND owner = :owner AND spender = :spender", {
blockHash,
token,
owner,
spender
})
.getOne();
}
async saveBalance({ blockHash, token, owner, value, proof }) {
const repo = this._conn.getRepository(Balance);
const entity = repo.create({ blockHash, token, owner, value, proof });
return repo.save(entity);
}
async saveAllowance({ blockHash, token, owner, spender, value, proof }) {
const repo = this._conn.getRepository(Allowance);
const entity = repo.create({ blockHash, token, owner, spender, value, proof });
return repo.save(entity);
}
// Returns true if events have already been synced for the (block, token) combination.
async didSyncEvents({ blockHash, token }) {
const numRows = await this._conn.getRepository(EventSyncProgress)
.createQueryBuilder()
.where("blockHash = :blockHash AND token = :token", {
blockHash,
token,
})
.getCount();
return numRows > 0;
}
async getEvents({ blockHash, token }) {
return this._conn.getRepository(Event)
.createQueryBuilder("event")
.where("blockHash = :blockHash AND token = :token", {
blockHash,
token,
})
.getMany();
}
async getEventsByName({ blockHash, token, eventName }) {
return this._conn.getRepository(Event)
.createQueryBuilder("event")
.where("blockHash = :blockHash AND token = :token AND :eventName = :eventName", {
blockHash,
token,
eventName
})
.getMany();
}
async saveEvents({ blockHash, token, events }) {
// TODO: Using the same connection doesn't work when > 1 inserts are attempted at the same time (e.g. simultaneous GQL requests).
// In a transaction:
// (1) Save all the events in the database.
// (2) Add an entry to the event progress table.
await this._conn.transaction(async (tx) => {
// Bulk insert events.
await tx.createQueryBuilder()
.insert()
.into(Event)
.values(events)
.execute();
// Update event sync progress.
const repo = tx.getRepository(EventSyncProgress);
const progress = repo.create({ blockHash, token });
await repo.save(progress);
});
}
}

View File

@ -0,0 +1,27 @@
import { Entity, PrimaryGeneratedColumn, Column, Index } from "typeorm";
@Entity()
@Index(["blockHash", "token", "owner", "spender"], { unique: true })
export class Allowance {
@PrimaryGeneratedColumn()
id: number;
@Column("varchar", { length: 66 })
blockHash: string;
@Column("varchar", { length: 42 })
token: string;
@Column("varchar", { length: 42 })
owner: string;
@Column("varchar", { length: 42 })
spender: string;
@Column("numeric")
value: number;
@Column("text")
proof: string;
}

View File

@ -0,0 +1,24 @@
import { Entity, PrimaryGeneratedColumn, Column, Index } from "typeorm";
@Entity()
@Index(["blockHash", "token", "owner"], { unique: true })
export class Balance {
@PrimaryGeneratedColumn()
id: number;
@Column("varchar", { length: 66 })
blockHash: string;
@Column("varchar", { length: 42 })
token: string;
@Column("varchar", { length: 42 })
owner: string;
@Column("numeric")
value: number;
@Column("text")
proof: string;
}

View File

@ -0,0 +1,46 @@
import { Entity, PrimaryGeneratedColumn, Column, Index } from "typeorm";
@Entity()
// Index to query all events for a contract efficiently.
@Index(["blockHash", "token"])
// Index to query 'Transfer' events efficiently.
@Index(["blockHash", "token", "eventName", "transferFrom", "transferTo"])
// Index to query 'Approval' events efficiently.
@Index(["blockHash", "token", "eventName", "approvalOwner", "approvalSpender"])
export class Event {
@PrimaryGeneratedColumn()
id: number;
@Column("varchar", { length: 66 })
blockHash: string;
@Column("varchar", { length: 42 })
token: string;
@Column("varchar", { length: 256 })
eventName: string;
@Column("text")
proof: string;
// Transfer event columns.
@Column("varchar", { length: 42, nullable: true })
transferFrom: string;
@Column("varchar", { length: 42, nullable: true })
transferTo: string;
@Column("numeric", { nullable: true })
transferValue: number;
// Approval event columns.
@Column("varchar", { length: 42, nullable: true })
approvalOwner: string;
@Column("varchar", { length: 42, nullable: true })
approvalSpender: string;
@Column("numeric", { nullable: true })
approvalValue: number;
}

View File

@ -0,0 +1,21 @@
import { Entity, PrimaryGeneratedColumn, Column, Index } from "typeorm";
// Stores a row if events for a (block, token) combination have already been fetched.
//
// Required as a particular block may not have events from a particular contract,
// and we need to differentiate between that case and the case where data hasn't
// yet been synced from upstream.
//
@Entity()
@Index(["blockHash", "token"], { unique: true })
export class EventSyncProgress {
@PrimaryGeneratedColumn()
id: number;
@Column("varchar", { length: 66 })
blockHash: string;
@Column("varchar", { length: 42 })
token: string;
}

View File

@ -1,29 +1,44 @@
import assert from "assert";
import debug from 'debug';
import { Connection } from "typeorm";
import { invert } from "lodash";
import { EthClient, getMappingSlot, topictoAddress } from "@vulcanize/ipld-eth-client";
import { getStorageInfo, getEventNameTopics, getStorageValue } from '@vulcanize/solidity-mapper';
import { storageLayout, abi } from './artifacts/ERC20.json';
import { EthClient, getMappingSlot, topictoAddress } from "@vulcanize/ipld-eth-client";
import { getStorageInfo, getEventNameTopics, getStorageValue, GetStorageAt } from '@vulcanize/solidity-mapper';
import { Database } from './database';
const log = debug('vulcanize:indexer');
export class Indexer {
_db: Connection
_db: Database
_ethClient: EthClient
_getStorageAt: GetStorageAt
constructor(db, ethClient) {
_abi: any
_storageLayout: any
constructor(db, ethClient, artifacts) {
assert(db);
assert(ethClient);
assert(artifacts);
const { abi, storageLayout } = artifacts;
assert(abi);
assert(storageLayout);
this._db = db;
this._ethClient = ethClient;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
this._abi = abi;
this._storageLayout = storageLayout;
}
async totalSupply(blockHash, token) {
const { slot } = getStorageInfo(storageLayout, '_totalSupply');
// TODO: Use getStorageValue when it supports uint256 values.
const { slot } = getStorageInfo(this._storageLayout, '_totalSupply');
const vars = {
blockHash,
@ -31,14 +46,23 @@ export class Indexer {
slot
};
const result = await this._ethClient.getStorageAt(vars);
const result = await this._getStorageAt(vars);
log(JSON.stringify(result, null, 2));
return result;
}
async getBalanceOf(blockHash, token, owner) {
const { slot: balancesSlot } = getStorageInfo(storageLayout, '_balances');
async balanceOf(blockHash, token, owner) {
const entity = await this._db.getBalance({ blockHash, token, owner });
if (entity) {
return {
value: entity.value,
proof: JSON.parse(entity.proof)
}
}
// TODO: Use getStorageValue when it supports mappings.
const { slot: balancesSlot } = getStorageInfo(this._storageLayout, '_balances');
const slot = getMappingSlot(balancesSlot, owner);
const vars = {
@ -47,14 +71,26 @@ export class Indexer {
slot
};
const result = await this._ethClient.getStorageAt(vars);
const result = await this._getStorageAt(vars);
log(JSON.stringify(result, null, 2));
const { value, proof } = result;
await this._db.saveBalance({ blockHash, token, owner, value, proof: JSON.stringify(proof) });
return result;
}
async getAllowance(blockHash, token, owner, spender) {
const { slot: allowancesSlot } = getStorageInfo(storageLayout, '_allowances');
async allowance(blockHash, token, owner, spender) {
const entity = await this._db.getAllowance({ blockHash, token, owner, spender });
if (entity) {
return {
value: entity.value,
proof: JSON.parse(entity.proof)
}
}
// TODO: Use getStorageValue when it supports nested mappings.
const { slot: allowancesSlot } = getStorageInfo(this._storageLayout, '_allowances');
const slot = getMappingSlot(getMappingSlot(allowancesSlot, owner), spender);
const vars = {
@ -63,20 +99,17 @@ export class Indexer {
slot
};
const result = await this._ethClient.getStorageAt(vars);
const result = await this._getStorageAt(vars);
log(JSON.stringify(result, null, 2));
const { value, proof } = result;
await this._db.saveAllowance({ blockHash, token, owner, spender, value, proof: JSON.stringify(proof) });
return result;
}
async name(blockHash, token) {
const result = await getStorageValue(
storageLayout,
this._ethClient.getStorageAt.bind(this._ethClient),
blockHash,
token,
'_name'
)
const result = await this._getStorageValue(blockHash, token, '_name');
log(JSON.stringify(result, null, 2));
@ -84,13 +117,7 @@ export class Indexer {
}
async symbol(blockHash, token) {
const result = await getStorageValue(
storageLayout,
this._ethClient.getStorageAt.bind(this._ethClient),
blockHash,
token,
'_symbol'
)
const result = await this._getStorageValue(blockHash, token, '_symbol');
log(JSON.stringify(result, null, 2));
@ -105,58 +132,114 @@ export class Indexer {
}
async getEvents(blockHash, token, name) {
const vars = {
blockHash,
contract: token
};
const didSyncEvents = await this._db.didSyncEvents({ blockHash, token });
if (!didSyncEvents) {
// Fetch and save events first and make a note in the event sync progress table.
await this._fetchAndSaveEvents({ blockHash, token });
log(`synced events for block ${blockHash} contract ${token}`);
}
const logs = await this._ethClient.getLogs(vars);
log(JSON.stringify(logs, null, 2));
assert(await this._db.didSyncEvents({ blockHash, token }));
const erc20EventNameTopics = getEventNameTopics(abi);
const gqlEventType = invert(erc20EventNameTopics);
const events = await this._db.getEvents({ blockHash, token });
return logs
.filter(e => !name || erc20EventNameTopics[name] === e.topics[0])
const result = events
// TODO: Filter using db WHERE condition when name is not empty.
.filter(event => !name || name === event.eventName)
.map(e => {
const [topic0, topic1, topic2] = e.topics;
const eventFields = {};
const eventName = gqlEventType[topic0];
const address1 = topictoAddress(topic1);
const address2 = topictoAddress(topic2);
const eventFields = { value: e.data };
switch (eventName) {
switch (e.eventName) {
case 'Transfer': {
eventFields['from'] = address1;
eventFields['to'] = address2;
eventFields['from'] = e.transferFrom;
eventFields['to'] = e.transferTo;
eventFields['value'] = e.transferValue;
break;
};
case 'Approval': {
eventFields['owner'] = address1;
eventFields['spender'] = address2;
eventFields['owner'] = e.approvalOwner;
eventFields['spender'] = e.approvalSpender;
eventFields['value'] = e.approvalValue;
break;
};
}
return {
event: {
__typename: `${eventName}Event`,
__typename: `${e.eventName}Event`,
...eventFields
},
proof: {
// TODO: Return proof only if requested.
data: JSON.stringify({
blockHash,
receipt: {
cid: e.cid,
ipldBlock: e.ipldBlock
}
})
}
// TODO: Return proof only if requested.
proof: JSON.parse(e.proof)
}
});
log(JSON.stringify(result, null, 2));
return result;
}
}
// TODO: Move into base/class or framework package.
async _getStorageValue(blockHash, token, variable) {
return getStorageValue(
this._storageLayout,
this._getStorageAt,
blockHash,
token,
variable
);
}
async _fetchAndSaveEvents({ blockHash, token }) {
const logs = await this._ethClient.getLogs({ blockHash, contract: token });
log(JSON.stringify(logs, null, 2));
const eventNameToTopic = getEventNameTopics(this._abi);
const logTopicToEventName = invert(eventNameToTopic);
const dbEvents = logs.map(log => {
const { topics, data: value, cid, ipldBlock } = log;
const [topic0, topic1, topic2] = topics;
const eventName = logTopicToEventName[topic0];
const address1 = topictoAddress(topic1);
const address2 = topictoAddress(topic2);
const event = {
blockHash,
token,
eventName,
proof: JSON.stringify({
data: JSON.stringify({
blockHash,
receipt: {
cid,
ipldBlock
}
})
}),
};
switch (eventName) {
case 'Transfer': {
event['transferFrom'] = address1;
event['transferTo'] = address2;
event['transferValue'] = value;
break;
};
case 'Approval': {
event['approvalOwner'] = address1;
event['approvalSpender'] = address2;
event['approvalValue'] = value;
break;
};
}
return event;
});
await this._db.saveEvents({ blockHash, token, events: dbEvents });
}
}

View File

@ -4,21 +4,43 @@ import debug from 'debug';
import fs from 'fs-extra';
import path from 'path';
import "reflect-metadata";
import { createConnection } from "typeorm";
import { getCache } from '@vulcanize/cache';
import { EthClient } from '@vulcanize/ipld-eth-client';
import artifacts from './artifacts/ERC20.json';
import { Indexer } from './indexer';
import { Database } from './database';
const log = debug('vulcanize:resolver');
export const createResolvers = async (config) => {
const ormConfig = JSON.parse(await fs.readFile(path.join(process.cwd(), "ormconfig.json")));
const db = await createConnection(ormConfig);
const { upstream, database } = config;
assert(database, 'Missing database config');
const ormConfig = {
...database,
entities: [
"src/entity/**/*.ts"
],
migrations: [
"src/migration/**/*.ts"
],
subscribers: [
"src/subscriber/**/*.ts"
],
cli: {
entitiesDir: "src/entity",
migrationsDir: "src/migration",
subscribersDir: "src/subscriber"
}
};
const db = new Database(ormConfig);
await db.init();
const { upstream } = config;
assert(upstream, 'Missing upstream config');
const { gqlEndpoint, cache: cacheConfig } = upstream;
@ -27,7 +49,7 @@ export const createResolvers = async (config) => {
const cache = await getCache(cacheConfig);
const ethClient = new EthClient({ gqlEndpoint, cache });
const indexer = new Indexer(db, ethClient);
const indexer = new Indexer(db, ethClient, artifacts);
return {
BigInt: new BigInt('bigInt'),
@ -51,12 +73,12 @@ export const createResolvers = async (config) => {
balanceOf: async (_, { blockHash, token, owner }) => {
log('balanceOf', blockHash, token, owner);
return indexer.getBalanceOf(blockHash, token, owner);
return indexer.balanceOf(blockHash, token, owner);
},
allowance: async (_, { blockHash, token, owner, spender }) => {
log('allowance', blockHash, token, owner, spender);
return indexer.getAllowance(blockHash, token, owner, spender);
return indexer.allowance(blockHash, token, owner, spender);
},
name: (_, { blockHash, token }) => {