diff --git a/README.md b/README.md index da9e333f..86f1a5e5 100644 --- a/README.md +++ b/README.md @@ -51,22 +51,44 @@ yarn test ```text { - balanceOf(blockHash: "0x671e693ec3dccc948606db8d7e65ac3e16baea80a0dd6d56a126e07ccf85231f", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1", owner: "0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc") { + name(blockHash: "0x5ef95c9847f15179b64fa57994355623f899aca097ad779421b8dff866a8b9c3", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1") { value proof { data } } - allowance(blockHash: "0x671e693ec3dccc948606db8d7e65ac3e16baea80a0dd6d56a126e07ccf85231f", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1", owner: "0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc", spender: "0xCA6D29232D1435D8198E3E5302495417dD073d61") { + symbol(blockHash: "0x5ef95c9847f15179b64fa57994355623f899aca097ad779421b8dff866a8b9c3", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1") { value proof { data } } - events(blockHash: "0x671e693ec3dccc948606db8d7e65ac3e16baea80a0dd6d56a126e07ccf85231f", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1") { + totalSupply(blockHash: "0x5ef95c9847f15179b64fa57994355623f899aca097ad779421b8dff866a8b9c3", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1") { + value + proof { + data + } + } + + balanceOf(blockHash: "0x5ef95c9847f15179b64fa57994355623f899aca097ad779421b8dff866a8b9c3", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1", owner: "0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc") { + value + proof { + data + } + } + + allowance(blockHash: "0x81ed2b04af35b1b276281c37243212731202d5a191a27d07b22a605fd442998d", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1", owner: "0xDC7d7A8920C8Eecc098da5B7522a5F31509b5Bfc", spender: "0xCA6D29232D1435D8198E3E5302495417dD073d61") { + value + proof { + data + } + } + + events(blockHash: "0x3441ba476dff95c58528afe754ceec659e0ef8ff1b59244ec4545f4f9784a51c", token: "0x1ca7c995f8eF0A2989BbcE08D5B7Efe50A584aa1") { event { + __typename ... on TransferEvent { from to @@ -83,4 +105,5 @@ yarn test } } } + ``` diff --git a/packages/watcher/environments/local.toml b/packages/watcher/environments/local.toml index 21c90626..37880a0a 100644 --- a/packages/watcher/environments/local.toml +++ b/packages/watcher/environments/local.toml @@ -2,6 +2,11 @@ host = "127.0.0.1" port = 3001 +[database] + type = "sqlite" + database = "out/indexer.db" + synchronize = true + logging = false [upstream] gqlEndpoint = "http://127.0.0.1:8083/graphql" diff --git a/packages/watcher/ormconfig.json b/packages/watcher/ormconfig.json deleted file mode 100644 index 5dc44473..00000000 --- a/packages/watcher/ormconfig.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "type": "sqlite", - "database": "out/indexer.db", - "synchronize": true, - "logging": false, - "entities": [ - "src/entity/**/*.ts" - ], - "migrations": [ - "src/migration/**/*.ts" - ], - "subscribers": [ - "src/subscriber/**/*.ts" - ], - "cli": { - "entitiesDir": "src/entity", - "migrationsDir": "src/migration", - "subscribersDir": "src/subscriber" - } -} \ No newline at end of file diff --git a/packages/watcher/src/database.ts b/packages/watcher/src/database.ts new file mode 100644 index 00000000..1a386e17 --- /dev/null +++ b/packages/watcher/src/database.ts @@ -0,0 +1,114 @@ +import assert from "assert"; +import { Connection, createConnection } from "typeorm"; + +import { Allowance } from "./entity/Allowance"; +import { Balance } from "./entity/Balance"; +import { Event } from "./entity/Event"; +import { EventSyncProgress } from "./entity/EventProgress"; + +export class Database { + + _config: any + _conn: Connection + + constructor(config) { + assert(config); + this._config = config; + } + + async init() { + assert(!this._conn); + this._conn = await createConnection(this._config); + } + + async getBalance({ blockHash, token, owner }) { + return this._conn.getRepository(Balance) + .createQueryBuilder("balance") + .where("blockHash = :blockHash AND token = :token AND owner = :owner", { + blockHash, + token, + owner + }) + .getOne(); + } + + async getAllowance({ blockHash, token, owner, spender }) { + return this._conn.getRepository(Allowance) + .createQueryBuilder("allowance") + .where("blockHash = :blockHash AND token = :token AND owner = :owner AND spender = :spender", { + blockHash, + token, + owner, + spender + }) + .getOne(); + } + + async saveBalance({ blockHash, token, owner, value, proof }) { + const repo = this._conn.getRepository(Balance); + const entity = repo.create({ blockHash, token, owner, value, proof }); + return repo.save(entity); + } + + async saveAllowance({ blockHash, token, owner, spender, value, proof }) { + const repo = this._conn.getRepository(Allowance); + const entity = repo.create({ blockHash, token, owner, spender, value, proof }); + return repo.save(entity); + } + + // Returns true if events have already been synced for the (block, token) combination. + async didSyncEvents({ blockHash, token }) { + const numRows = await this._conn.getRepository(EventSyncProgress) + .createQueryBuilder() + .where("blockHash = :blockHash AND token = :token", { + blockHash, + token, + }) + .getCount(); + + return numRows > 0; + } + + async getEvents({ blockHash, token }) { + return this._conn.getRepository(Event) + .createQueryBuilder("event") + .where("blockHash = :blockHash AND token = :token", { + blockHash, + token, + }) + .getMany(); + } + + async getEventsByName({ blockHash, token, eventName }) { + return this._conn.getRepository(Event) + .createQueryBuilder("event") + .where("blockHash = :blockHash AND token = :token AND :eventName = :eventName", { + blockHash, + token, + eventName + }) + .getMany(); + } + + async saveEvents({ blockHash, token, events }) { + // TODO: Using the same connection doesn't work when > 1 inserts are attempted at the same time (e.g. simultaneous GQL requests). + + // In a transaction: + // (1) Save all the events in the database. + // (2) Add an entry to the event progress table. + + await this._conn.transaction(async (tx) => { + // Bulk insert events. + await tx.createQueryBuilder() + .insert() + .into(Event) + .values(events) + .execute(); + + // Update event sync progress. + const repo = tx.getRepository(EventSyncProgress); + const progress = repo.create({ blockHash, token }); + await repo.save(progress); + }); + } +} diff --git a/packages/watcher/src/entity/Allowance.ts b/packages/watcher/src/entity/Allowance.ts new file mode 100644 index 00000000..6733b310 --- /dev/null +++ b/packages/watcher/src/entity/Allowance.ts @@ -0,0 +1,27 @@ +import { Entity, PrimaryGeneratedColumn, Column, Index } from "typeorm"; + +@Entity() +@Index(["blockHash", "token", "owner", "spender"], { unique: true }) +export class Allowance { + + @PrimaryGeneratedColumn() + id: number; + + @Column("varchar", { length: 66 }) + blockHash: string; + + @Column("varchar", { length: 42 }) + token: string; + + @Column("varchar", { length: 42 }) + owner: string; + + @Column("varchar", { length: 42 }) + spender: string; + + @Column("numeric") + value: number; + + @Column("text") + proof: string; +} diff --git a/packages/watcher/src/entity/Balance.ts b/packages/watcher/src/entity/Balance.ts new file mode 100644 index 00000000..fc8ce8cc --- /dev/null +++ b/packages/watcher/src/entity/Balance.ts @@ -0,0 +1,24 @@ +import { Entity, PrimaryGeneratedColumn, Column, Index } from "typeorm"; + +@Entity() +@Index(["blockHash", "token", "owner"], { unique: true }) +export class Balance { + + @PrimaryGeneratedColumn() + id: number; + + @Column("varchar", { length: 66 }) + blockHash: string; + + @Column("varchar", { length: 42 }) + token: string; + + @Column("varchar", { length: 42 }) + owner: string; + + @Column("numeric") + value: number; + + @Column("text") + proof: string; +} diff --git a/packages/watcher/src/entity/Event.ts b/packages/watcher/src/entity/Event.ts new file mode 100644 index 00000000..9490d917 --- /dev/null +++ b/packages/watcher/src/entity/Event.ts @@ -0,0 +1,46 @@ +import { Entity, PrimaryGeneratedColumn, Column, Index } from "typeorm"; + +@Entity() +// Index to query all events for a contract efficiently. +@Index(["blockHash", "token"]) +// Index to query 'Transfer' events efficiently. +@Index(["blockHash", "token", "eventName", "transferFrom", "transferTo"]) +// Index to query 'Approval' events efficiently. +@Index(["blockHash", "token", "eventName", "approvalOwner", "approvalSpender"]) +export class Event { + + @PrimaryGeneratedColumn() + id: number; + + @Column("varchar", { length: 66 }) + blockHash: string; + + @Column("varchar", { length: 42 }) + token: string; + + @Column("varchar", { length: 256 }) + eventName: string; + + @Column("text") + proof: string; + + // Transfer event columns. + @Column("varchar", { length: 42, nullable: true }) + transferFrom: string; + + @Column("varchar", { length: 42, nullable: true }) + transferTo: string; + + @Column("numeric", { nullable: true }) + transferValue: number; + + // Approval event columns. + @Column("varchar", { length: 42, nullable: true }) + approvalOwner: string; + + @Column("varchar", { length: 42, nullable: true }) + approvalSpender: string; + + @Column("numeric", { nullable: true }) + approvalValue: number; +} diff --git a/packages/watcher/src/entity/EventProgress.ts b/packages/watcher/src/entity/EventProgress.ts new file mode 100644 index 00000000..83484a39 --- /dev/null +++ b/packages/watcher/src/entity/EventProgress.ts @@ -0,0 +1,21 @@ +import { Entity, PrimaryGeneratedColumn, Column, Index } from "typeorm"; + +// Stores a row if events for a (block, token) combination have already been fetched. +// +// Required as a particular block may not have events from a particular contract, +// and we need to differentiate between that case and the case where data hasn't +// yet been synced from upstream. +// +@Entity() +@Index(["blockHash", "token"], { unique: true }) +export class EventSyncProgress { + + @PrimaryGeneratedColumn() + id: number; + + @Column("varchar", { length: 66 }) + blockHash: string; + + @Column("varchar", { length: 42 }) + token: string; +} diff --git a/packages/watcher/src/indexer.ts b/packages/watcher/src/indexer.ts index f87b20e5..6dacd224 100644 --- a/packages/watcher/src/indexer.ts +++ b/packages/watcher/src/indexer.ts @@ -1,29 +1,44 @@ import assert from "assert"; import debug from 'debug'; -import { Connection } from "typeorm"; import { invert } from "lodash"; -import { EthClient, getMappingSlot, topictoAddress } from "@vulcanize/ipld-eth-client"; -import { getStorageInfo, getEventNameTopics, getStorageValue } from '@vulcanize/solidity-mapper'; -import { storageLayout, abi } from './artifacts/ERC20.json'; +import { EthClient, getMappingSlot, topictoAddress } from "@vulcanize/ipld-eth-client"; +import { getStorageInfo, getEventNameTopics, getStorageValue, GetStorageAt } from '@vulcanize/solidity-mapper'; + +import { Database } from './database'; const log = debug('vulcanize:indexer'); export class Indexer { - _db: Connection + _db: Database _ethClient: EthClient + _getStorageAt: GetStorageAt - constructor(db, ethClient) { + _abi: any + _storageLayout: any + + constructor(db, ethClient, artifacts) { assert(db); assert(ethClient); + assert(artifacts); + + const { abi, storageLayout } = artifacts; + + assert(abi); + assert(storageLayout); this._db = db; this._ethClient = ethClient; + this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); + + this._abi = abi; + this._storageLayout = storageLayout; } async totalSupply(blockHash, token) { - const { slot } = getStorageInfo(storageLayout, '_totalSupply'); + // TODO: Use getStorageValue when it supports uint256 values. + const { slot } = getStorageInfo(this._storageLayout, '_totalSupply'); const vars = { blockHash, @@ -31,14 +46,23 @@ export class Indexer { slot }; - const result = await this._ethClient.getStorageAt(vars); + const result = await this._getStorageAt(vars); log(JSON.stringify(result, null, 2)); return result; } - async getBalanceOf(blockHash, token, owner) { - const { slot: balancesSlot } = getStorageInfo(storageLayout, '_balances'); + async balanceOf(blockHash, token, owner) { + const entity = await this._db.getBalance({ blockHash, token, owner }); + if (entity) { + return { + value: entity.value, + proof: JSON.parse(entity.proof) + } + } + + // TODO: Use getStorageValue when it supports mappings. + const { slot: balancesSlot } = getStorageInfo(this._storageLayout, '_balances'); const slot = getMappingSlot(balancesSlot, owner); const vars = { @@ -47,14 +71,26 @@ export class Indexer { slot }; - const result = await this._ethClient.getStorageAt(vars); + const result = await this._getStorageAt(vars); log(JSON.stringify(result, null, 2)); + const { value, proof } = result; + await this._db.saveBalance({ blockHash, token, owner, value, proof: JSON.stringify(proof) }); + return result; } - async getAllowance(blockHash, token, owner, spender) { - const { slot: allowancesSlot } = getStorageInfo(storageLayout, '_allowances'); + async allowance(blockHash, token, owner, spender) { + const entity = await this._db.getAllowance({ blockHash, token, owner, spender }); + if (entity) { + return { + value: entity.value, + proof: JSON.parse(entity.proof) + } + } + + // TODO: Use getStorageValue when it supports nested mappings. + const { slot: allowancesSlot } = getStorageInfo(this._storageLayout, '_allowances'); const slot = getMappingSlot(getMappingSlot(allowancesSlot, owner), spender); const vars = { @@ -63,20 +99,17 @@ export class Indexer { slot }; - const result = await this._ethClient.getStorageAt(vars); + const result = await this._getStorageAt(vars); log(JSON.stringify(result, null, 2)); + const { value, proof } = result; + await this._db.saveAllowance({ blockHash, token, owner, spender, value, proof: JSON.stringify(proof) }); + return result; } async name(blockHash, token) { - const result = await getStorageValue( - storageLayout, - this._ethClient.getStorageAt.bind(this._ethClient), - blockHash, - token, - '_name' - ) + const result = await this._getStorageValue(blockHash, token, '_name'); log(JSON.stringify(result, null, 2)); @@ -84,13 +117,7 @@ export class Indexer { } async symbol(blockHash, token) { - const result = await getStorageValue( - storageLayout, - this._ethClient.getStorageAt.bind(this._ethClient), - blockHash, - token, - '_symbol' - ) + const result = await this._getStorageValue(blockHash, token, '_symbol'); log(JSON.stringify(result, null, 2)); @@ -105,58 +132,114 @@ export class Indexer { } async getEvents(blockHash, token, name) { - const vars = { - blockHash, - contract: token - }; + const didSyncEvents = await this._db.didSyncEvents({ blockHash, token }); + if (!didSyncEvents) { + // Fetch and save events first and make a note in the event sync progress table. + await this._fetchAndSaveEvents({ blockHash, token }); + log(`synced events for block ${blockHash} contract ${token}`); + } - const logs = await this._ethClient.getLogs(vars); - log(JSON.stringify(logs, null, 2)); + assert(await this._db.didSyncEvents({ blockHash, token })); - const erc20EventNameTopics = getEventNameTopics(abi); - const gqlEventType = invert(erc20EventNameTopics); + const events = await this._db.getEvents({ blockHash, token }); - return logs - .filter(e => !name || erc20EventNameTopics[name] === e.topics[0]) + const result = events + // TODO: Filter using db WHERE condition when name is not empty. + .filter(event => !name || name === event.eventName) .map(e => { - const [topic0, topic1, topic2] = e.topics; + const eventFields = {}; - const eventName = gqlEventType[topic0]; - const address1 = topictoAddress(topic1); - const address2 = topictoAddress(topic2); - - const eventFields = { value: e.data }; - - - switch (eventName) { + switch (e.eventName) { case 'Transfer': { - eventFields['from'] = address1; - eventFields['to'] = address2; + eventFields['from'] = e.transferFrom; + eventFields['to'] = e.transferTo; + eventFields['value'] = e.transferValue; break; }; case 'Approval': { - eventFields['owner'] = address1; - eventFields['spender'] = address2; + eventFields['owner'] = e.approvalOwner; + eventFields['spender'] = e.approvalSpender; + eventFields['value'] = e.approvalValue; break; }; } return { event: { - __typename: `${eventName}Event`, + __typename: `${e.eventName}Event`, ...eventFields }, - proof: { - // TODO: Return proof only if requested. - data: JSON.stringify({ - blockHash, - receipt: { - cid: e.cid, - ipldBlock: e.ipldBlock - } - }) - } + // TODO: Return proof only if requested. + proof: JSON.parse(e.proof) } }); + + log(JSON.stringify(result, null, 2)); + + return result; } -} + + // TODO: Move into base/class or framework package. + async _getStorageValue(blockHash, token, variable) { + return getStorageValue( + this._storageLayout, + this._getStorageAt, + blockHash, + token, + variable + ); + } + + async _fetchAndSaveEvents({ blockHash, token }) { + const logs = await this._ethClient.getLogs({ blockHash, contract: token }); + log(JSON.stringify(logs, null, 2)); + + const eventNameToTopic = getEventNameTopics(this._abi); + const logTopicToEventName = invert(eventNameToTopic); + + const dbEvents = logs.map(log => { + const { topics, data: value, cid, ipldBlock } = log; + + const [topic0, topic1, topic2] = topics; + + const eventName = logTopicToEventName[topic0]; + const address1 = topictoAddress(topic1); + const address2 = topictoAddress(topic2); + + const event = { + blockHash, + token, + eventName, + + proof: JSON.stringify({ + data: JSON.stringify({ + blockHash, + receipt: { + cid, + ipldBlock + } + }) + }), + }; + + switch (eventName) { + case 'Transfer': { + event['transferFrom'] = address1; + event['transferTo'] = address2; + event['transferValue'] = value; + break; + }; + case 'Approval': { + event['approvalOwner'] = address1; + event['approvalSpender'] = address2; + event['approvalValue'] = value; + break; + }; + } + + return event; + }); + + await this._db.saveEvents({ blockHash, token, events: dbEvents }); + } +} \ No newline at end of file diff --git a/packages/watcher/src/resolvers.ts b/packages/watcher/src/resolvers.ts index 6e519d9d..f14f1f0d 100644 --- a/packages/watcher/src/resolvers.ts +++ b/packages/watcher/src/resolvers.ts @@ -4,21 +4,43 @@ import debug from 'debug'; import fs from 'fs-extra'; import path from 'path'; import "reflect-metadata"; -import { createConnection } from "typeorm"; import { getCache } from '@vulcanize/cache'; import { EthClient } from '@vulcanize/ipld-eth-client'; +import artifacts from './artifacts/ERC20.json'; import { Indexer } from './indexer'; +import { Database } from './database'; const log = debug('vulcanize:resolver'); export const createResolvers = async (config) => { - const ormConfig = JSON.parse(await fs.readFile(path.join(process.cwd(), "ormconfig.json"))); - const db = await createConnection(ormConfig); + const { upstream, database } = config; + + assert(database, 'Missing database config'); + + const ormConfig = { + ...database, + entities: [ + "src/entity/**/*.ts" + ], + migrations: [ + "src/migration/**/*.ts" + ], + subscribers: [ + "src/subscriber/**/*.ts" + ], + cli: { + entitiesDir: "src/entity", + migrationsDir: "src/migration", + subscribersDir: "src/subscriber" + } + }; + + const db = new Database(ormConfig); + await db.init(); - const { upstream } = config; assert(upstream, 'Missing upstream config'); const { gqlEndpoint, cache: cacheConfig } = upstream; @@ -27,7 +49,7 @@ export const createResolvers = async (config) => { const cache = await getCache(cacheConfig); const ethClient = new EthClient({ gqlEndpoint, cache }); - const indexer = new Indexer(db, ethClient); + const indexer = new Indexer(db, ethClient, artifacts); return { BigInt: new BigInt('bigInt'), @@ -51,12 +73,12 @@ export const createResolvers = async (config) => { balanceOf: async (_, { blockHash, token, owner }) => { log('balanceOf', blockHash, token, owner); - return indexer.getBalanceOf(blockHash, token, owner); + return indexer.balanceOf(blockHash, token, owner); }, allowance: async (_, { blockHash, token, owner, spender }) => { log('allowance', blockHash, token, owner, spender); - return indexer.getAllowance(blockHash, token, owner, spender); + return indexer.allowance(blockHash, token, owner, spender); }, name: (_, { blockHash, token }) => {