From a68f2770660a4a4954c9fa868df0e3ff33cbcd6a Mon Sep 17 00:00:00 2001 From: Matt K <1036969+mkrump@users.noreply.github.com> Date: Tue, 12 Dec 2017 15:55:26 -0600 Subject: [PATCH] Save Logs to DB (#100) * Save logs to database * Save multiple logs to db * Add block number parameter to `FindLogs` --- cmd/get_logs/main.go | 4 + .../1513029953_add_logs_table.down.sql | 1 + .../1513029953_add_logs_table.up.sql | 14 +++ db/schema.sql | 60 +++++++++ integration_test/contract_test.go | 11 +- pkg/core/log.go | 3 +- pkg/geth/geth_log_to_core_log.go | 7 +- pkg/geth/geth_log_to_core_log_test.go | 8 +- pkg/repositories/in_memory.go | 25 ++++ pkg/repositories/postgres.go | 115 +++++++++++++++-- pkg/repositories/postgres_test.go | 19 +++ pkg/repositories/repository.go | 2 + pkg/repositories/testing/helpers.go | 116 ++++++++++++++++++ 13 files changed, 366 insertions(+), 19 deletions(-) create mode 100644 db/migrations/1513029953_add_logs_table.down.sql create mode 100644 db/migrations/1513029953_add_logs_table.up.sql diff --git a/cmd/get_logs/main.go b/cmd/get_logs/main.go index 0de21e99..c5d5703d 100644 --- a/cmd/get_logs/main.go +++ b/cmd/get_logs/main.go @@ -16,18 +16,22 @@ func main() { contractHash := flag.String("contract-hash", "", "Contract hash to show summary") _blockNumber := flag.Int64("block-number", -1, "Block number of summary") flag.Parse() + config := cmd.LoadConfig(*environment) blockchain := geth.NewGethBlockchain(config.Client.IPCPath) + repository := cmd.LoadPostgres(config.Database, blockchain.Node()) blockNumber := cmd.RequestedBlockNumber(_blockNumber) logs, err := blockchain.GetLogs(core.Contract{Hash: *contractHash}, blockNumber) if err != nil { log.Fatalln(err) } + repository.CreateLogs(logs) for _, l := range logs { fmt.Println("\tAddress: ", l.Address) fmt.Println("\tTxHash: ", l.TxHash) fmt.Println("\tBlockNumber ", l.BlockNumber) + fmt.Println("\tIndex ", l.Index) fmt.Println("\tTopics: ") for i, topic := range l.Topics { fmt.Printf("\t\tTopic %d: %s\n", i, topic) diff --git a/db/migrations/1513029953_add_logs_table.down.sql b/db/migrations/1513029953_add_logs_table.down.sql new file mode 100644 index 00000000..c3866786 --- /dev/null +++ b/db/migrations/1513029953_add_logs_table.down.sql @@ -0,0 +1 @@ +DROP TABLE logs; \ No newline at end of file diff --git a/db/migrations/1513029953_add_logs_table.up.sql b/db/migrations/1513029953_add_logs_table.up.sql new file mode 100644 index 00000000..a0cd805d --- /dev/null +++ b/db/migrations/1513029953_add_logs_table.up.sql @@ -0,0 +1,14 @@ +CREATE TABLE logs ( + id SERIAL PRIMARY KEY, + block_number BIGINT, + address VARCHAR(66), + tx_hash VARCHAR(66), + index BIGINT, + topic0 VARCHAR(66), + topic1 VARCHAR(66), + topic2 VARCHAR(66), + topic3 VARCHAR(66), + data TEXT, + CONSTRAINT log_uc UNIQUE (block_number, index) +); + diff --git a/db/schema.sql b/db/schema.sql index 503eeceb..b50f6dc9 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -73,6 +73,43 @@ CREATE SEQUENCE blocks_id_seq ALTER SEQUENCE blocks_id_seq OWNED BY blocks.id; +-- +-- Name: logs; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE logs ( + id integer NOT NULL, + block_number bigint, + address character varying(66), + tx_hash character varying(66), + index bigint, + topic0 character varying(66), + topic1 character varying(66), + topic2 character varying(66), + topic3 character varying(66), + data text +); + + +-- +-- Name: logs_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE logs_id_seq + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE logs_id_seq OWNED BY logs.id; + + -- -- Name: nodes; Type: TABLE; Schema: public; Owner: - -- @@ -186,6 +223,13 @@ ALTER SEQUENCE watched_contracts_contract_id_seq OWNED BY watched_contracts.cont ALTER TABLE ONLY blocks ALTER COLUMN id SET DEFAULT nextval('blocks_id_seq'::regclass); +-- +-- Name: logs id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY logs ALTER COLUMN id SET DEFAULT nextval('logs_id_seq'::regclass); + + -- -- Name: nodes id; Type: DEFAULT; Schema: public; Owner: - -- @@ -223,6 +267,22 @@ ALTER TABLE ONLY watched_contracts ADD CONSTRAINT contract_hash_uc UNIQUE (contract_hash); +-- +-- Name: logs log_uc; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY logs + ADD CONSTRAINT log_uc UNIQUE (block_number, index); + + +-- +-- Name: logs logs_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY logs + ADD CONSTRAINT logs_pkey PRIMARY KEY (id); + + -- -- Name: nodes node_uc; Type: CONSTRAINT; Schema: public; Owner: - -- diff --git a/integration_test/contract_test.go b/integration_test/contract_test.go index 319df901..c8377320 100644 --- a/integration_test/contract_test.go +++ b/integration_test/contract_test.go @@ -108,12 +108,13 @@ var _ = Describe("Reading contracts", func() { BlockNumber: 4703824, TxHash: "0xf896bfd1eb539d881a1a31102b78de9f25cd591bf1fe1924b86148c0b205fd5d", Address: "0xd26114cd6EE289AccF82350c8d8487fedB8A0C07", - Topics: []string{ - "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", - "0x000000000000000000000000fbb1b73c4f0bda4f67dca266ce6ef42f520fbb98", - "0x000000000000000000000000d26114cd6ee289accf82350c8d8487fedb8a0c07", + Topics: map[int]string{ + 0: "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", + 1: "0x000000000000000000000000fbb1b73c4f0bda4f67dca266ce6ef42f520fbb98", + 2: "0x000000000000000000000000d26114cd6ee289accf82350c8d8487fedb8a0c07", }, - Data: "0x0000000000000000000000000000000000000000000000000c7d713b49da0000"} + Index: 19, + Data: "0x0000000000000000000000000000000000000000000000000c7d713b49da0000"} config, _ := cfg.NewConfig("infura") blockchain := geth.NewGethBlockchain(config.Client.IPCPath) contract := testing.SampleContract() diff --git a/pkg/core/log.go b/pkg/core/log.go index 3e864fcf..9688b46c 100644 --- a/pkg/core/log.go +++ b/pkg/core/log.go @@ -4,6 +4,7 @@ type Log struct { BlockNumber int64 TxHash string Address string - Topics []string + Topics map[int]string + Index int64 Data string } diff --git a/pkg/geth/geth_log_to_core_log.go b/pkg/geth/geth_log_to_core_log.go index 5bdabd57..4a20e35b 100644 --- a/pkg/geth/geth_log_to_core_log.go +++ b/pkg/geth/geth_log_to_core_log.go @@ -8,9 +8,9 @@ import ( func GethLogToCoreLog(gethLog types.Log) core.Log { topics := gethLog.Topics - var hexTopics []string - for _, topic := range topics { - hexTopics = append(hexTopics, topic.Hex()) + var hexTopics = make(map[int]string) + for i, topic := range topics { + hexTopics[i] = topic.Hex() } return core.Log{ Address: gethLog.Address.Hex(), @@ -18,6 +18,7 @@ func GethLogToCoreLog(gethLog types.Log) core.Log { BlockNumber: int64(gethLog.BlockNumber), Topics: hexTopics, TxHash: gethLog.TxHash.Hex(), + Index: int64(gethLog.Index), Data: hexutil.Encode(gethLog.Data), } } diff --git a/pkg/geth/geth_log_to_core_log_test.go b/pkg/geth/geth_log_to_core_log_test.go index 4fe595d7..a2a29438 100644 --- a/pkg/geth/geth_log_to_core_log_test.go +++ b/pkg/geth/geth_log_to_core_log_test.go @@ -32,9 +32,10 @@ var _ = Describe("Conversion of GethLog to core.Log", func() { BlockNumber: int64(gethLog.BlockNumber), Data: hexutil.Encode(gethLog.Data), TxHash: gethLog.TxHash.Hex(), - Topics: []string{ - common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef").Hex(), - common.HexToHash("0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615").Hex(), + Index: 2, + Topics: map[int]string{ + 0: common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef").Hex(), + 1: common.HexToHash("0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615").Hex(), }, } @@ -43,6 +44,7 @@ var _ = Describe("Conversion of GethLog to core.Log", func() { Expect(coreLog.Address).To(Equal(expected.Address)) Expect(coreLog.BlockNumber).To(Equal(expected.BlockNumber)) Expect(coreLog.Data).To(Equal(expected.Data)) + Expect(coreLog.Index).To(Equal(expected.Index)) Expect(coreLog.Topics[0]).To(Equal(expected.Topics[0])) Expect(coreLog.Topics[1]).To(Equal(expected.Topics[1])) Expect(coreLog.TxHash).To(Equal(expected.TxHash)) diff --git a/pkg/repositories/in_memory.go b/pkg/repositories/in_memory.go index c4415f0c..9e0bac9c 100644 --- a/pkg/repositories/in_memory.go +++ b/pkg/repositories/in_memory.go @@ -1,12 +1,36 @@ package repositories import ( + "fmt" + "github.com/8thlight/vulcanizedb/pkg/core" ) type InMemory struct { blocks map[int64]*core.Block contracts map[string]*core.Contract + logs map[string][]core.Log +} + +func (repository *InMemory) CreateLogs(logs []core.Log) error { + for _, log := range logs { + key := fmt.Sprintf("%s%s", log.BlockNumber, log.Index) + var logs []core.Log + repository.logs[key] = append(logs, log) + } + return nil +} + +func (repository *InMemory) FindLogs(address string, blockNumber int64) []core.Log { + var matchingLogs []core.Log + for _, logs := range repository.logs { + for _, log := range logs { + if log.Address == address && log.BlockNumber == blockNumber { + matchingLogs = append(matchingLogs, log) + } + } + } + return matchingLogs } func (repository *InMemory) CreateContract(contract core.Contract) error { @@ -48,6 +72,7 @@ func NewInMemory() *InMemory { return &InMemory{ blocks: make(map[int64]*core.Block), contracts: make(map[string]*core.Contract), + logs: make(map[string][]core.Log), } } diff --git a/pkg/repositories/postgres.go b/pkg/repositories/postgres.go index 743979b2..0d59b033 100644 --- a/pkg/repositories/postgres.go +++ b/pkg/repositories/postgres.go @@ -25,6 +25,52 @@ var ( ErrUnableToSetNode = errors.New("postgres: unable to set node") ) +func (repository Postgres) CreateLogs(logs []core.Log) error { + tx, _ := repository.Db.BeginTx(context.Background(), nil) + for _, tlog := range logs { + _, err := tx.Exec( + `INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (index, block_number) + DO UPDATE + SET block_number = $1, + address = $2, + tx_hash = $3, + index = $4, + topic0 = $5, + topic1 = $6, + topic2 = $7, + topic3 = $8, + data = $9 + `, + tlog.BlockNumber, tlog.Address, tlog.TxHash, tlog.Index, tlog.Topics[0], tlog.Topics[1], tlog.Topics[2], tlog.Topics[3], tlog.Data, + ) + if err != nil { + tx.Rollback() + return ErrDBInsertFailed + } + } + tx.Commit() + return nil +} + +func (repository Postgres) FindLogs(address string, blockNumber int64) []core.Log { + logRows, _ := repository.Db.Query( + `SELECT block_number, + address, + tx_hash, + index, + topic0, + topic1, + topic2, + topic3, + data + FROM logs + WHERE address = $1 AND block_number = $2 + ORDER BY block_number DESC`, address, blockNumber) + return repository.loadLogs(logRows) +} + func NewPostgres(databaseConfig config.Database, node core.Node) (Postgres, error) { connectString := config.DbConnectionString(databaseConfig) db, err := sqlx.Connect("postgres", connectString) @@ -86,9 +132,9 @@ func (repository Postgres) ContractExists(contractHash string) bool { func (repository Postgres) FindContract(contractHash string) *core.Contract { var hash string var abi string - row := repository.Db.QueryRow( + contract := repository.Db.QueryRow( `SELECT contract_hash, contract_abi FROM watched_contracts WHERE contract_hash=$1`, contractHash) - err := row.Scan(&hash, &abi) + err := contract.Scan(&hash, &abi) if err == sql.ErrNoRows { return nil } @@ -118,9 +164,19 @@ func (repository Postgres) MissingBlockNumbers(startingBlockNumber int64, highes func (repository Postgres) FindBlockByNumber(blockNumber int64) *core.Block { blockRows, _ := repository.Db.Query( - `SELECT id, block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash - FROM blocks - WHERE node_id = $1`, repository.nodeId) + `SELECT id, + block_number, + block_gaslimit, + block_gasused, + block_time, + block_difficulty, + block_hash, + block_nonce, + block_parenthash, + block_size, + uncle_hash + FROM blocks + WHERE node_id = $1`, repository.nodeId) var savedBlocks []core.Block for blockRows.Next() { savedBlock := repository.loadBlock(blockRows) @@ -189,7 +245,16 @@ func (repository Postgres) loadBlock(blockRows *sql.Rows) core.Block { var gasUsed float64 var uncleHash string blockRows.Scan(&blockId, &blockNumber, &gasLimit, &gasUsed, &blockTime, &difficulty, &blockHash, &blockNonce, &blockParentHash, &blockSize, &uncleHash) - transactionRows, _ := repository.Db.Query(`SELECT tx_hash, tx_nonce, tx_to, tx_from, tx_gaslimit, tx_gasprice, tx_value FROM transactions WHERE block_id = $1`, blockId) + transactionRows, _ := repository.Db.Query(` + SELECT tx_hash, + tx_nonce, + tx_to, + tx_from, + tx_gaslimit, + tx_gasprice, + tx_value + FROM transactions + WHERE block_id = $1`, blockId) transactions := repository.loadTransactions(transactionRows) return core.Block{ Difficulty: difficulty, @@ -206,6 +271,32 @@ func (repository Postgres) loadBlock(blockRows *sql.Rows) core.Block { } } +func (repository Postgres) loadLogs(logsRows *sql.Rows) []core.Log { + var logs []core.Log + for logsRows.Next() { + var blockNumber int64 + var address string + var txHash string + var index int64 + var data string + topics := make([]string, 4) + logsRows.Scan(&blockNumber, &address, &txHash, &index, &topics[0], &topics[1], &topics[2], &topics[3], &data) + log := core.Log{ + BlockNumber: blockNumber, + TxHash: txHash, + Address: address, + Index: index, + Data: data, + } + log.Topics = make(map[int]string) + for i, topic := range topics { + log.Topics[i] = topic + } + logs = append(logs, log) + } + return logs +} + func (repository Postgres) loadTransactions(transactionRows *sql.Rows) []core.Transaction { var transactions []core.Transaction for transactionRows.Next() { @@ -232,7 +323,17 @@ func (repository Postgres) loadTransactions(transactionRows *sql.Rows) []core.Tr } func (repository Postgres) addTransactions(contract core.Contract) core.Contract { - transactionRows, _ := repository.Db.Query(`SELECT tx_hash, tx_nonce, tx_to, tx_from, tx_gaslimit, tx_gasprice, tx_value FROM transactions WHERE tx_to = $1 ORDER BY block_id DESC`, contract.Hash) + transactionRows, _ := repository.Db.Query(` + SELECT tx_hash, + tx_nonce, + tx_to, + tx_from, + tx_gaslimit, + tx_gasprice, + tx_value + FROM transactions + WHERE tx_to = $1 + ORDER BY block_id DESC`, contract.Hash) transactions := repository.loadTransactions(transactionRows) savedContract := core.Contract{Hash: contract.Hash, Transactions: transactions, Abi: contract.Abi} return savedContract diff --git a/pkg/repositories/postgres_test.go b/pkg/repositories/postgres_test.go index f5d7895f..dca3f37f 100644 --- a/pkg/repositories/postgres_test.go +++ b/pkg/repositories/postgres_test.go @@ -65,6 +65,25 @@ var _ = Describe("Postgres repository", func() { Expect(err).To(Equal(repositories.ErrUnableToSetNode)) }) + It("does not commit log if log is invalid", func() { + //badTxHash violates db tx_hash field length + badTxHash := fmt.Sprintf("x %s", strings.Repeat("1", 100)) + badLog := core.Log{ + Address: "x123", + BlockNumber: 1, + TxHash: badTxHash, + } + cfg, _ := config.NewConfig("private") + node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1} + repository, _ := repositories.NewPostgres(cfg.Database, node) + + err := repository.CreateLogs([]core.Log{badLog}) + savedBlock := repository.FindLogs("x123", 1) + + Expect(err).ToNot(BeNil()) + Expect(savedBlock).To(BeNil()) + }) + It("does not commit block or transactions if transaction is invalid", func() { //badHash violates db To field length badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100)) diff --git a/pkg/repositories/repository.go b/pkg/repositories/repository.go index 3a8b5095..51e0d81d 100644 --- a/pkg/repositories/repository.go +++ b/pkg/repositories/repository.go @@ -11,4 +11,6 @@ type Repository interface { CreateContract(contract core.Contract) error ContractExists(contractHash string) bool FindContract(contractHash string) *core.Contract + CreateLogs(log []core.Log) error + FindLogs(address string, blockNumber int64) []core.Log } diff --git a/pkg/repositories/testing/helpers.go b/pkg/repositories/testing/helpers.go index 6b93ffb6..c482be68 100644 --- a/pkg/repositories/testing/helpers.go +++ b/pkg/repositories/testing/helpers.go @@ -1,6 +1,8 @@ package testing import ( + "sort" + "github.com/8thlight/vulcanizedb/pkg/core" "github.com/8thlight/vulcanizedb/pkg/repositories" . "github.com/onsi/ginkgo" @@ -11,6 +13,7 @@ func ClearData(postgres repositories.Postgres) { postgres.Db.MustExec("DELETE FROM watched_contracts") postgres.Db.MustExec("DELETE FROM transactions") postgres.Db.MustExec("DELETE FROM blocks") + postgres.Db.MustExec("DELETE FROM logs") } func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.Repository) { @@ -294,4 +297,117 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories. }) }) + Describe("Saving logs", func() { + It("returns the log when it exists", func() { + repository.CreateLogs([]core.Log{{ + BlockNumber: 1, + Index: 0, + Address: "x123", + TxHash: "x456", + Topics: map[int]string{0: "x777", 1: "x888", 2: "x999"}, + Data: "xabc", + }}, + ) + + log := repository.FindLogs("x123", 1) + + Expect(log).NotTo(BeNil()) + Expect(log[0].BlockNumber).To(Equal(int64(1))) + Expect(log[0].Address).To(Equal("x123")) + Expect(log[0].Index).To(Equal(int64(0))) + Expect(log[0].TxHash).To(Equal("x456")) + Expect(log[0].Topics[0]).To(Equal("x777")) + Expect(log[0].Topics[1]).To(Equal("x888")) + Expect(log[0].Topics[2]).To(Equal("x999")) + Expect(log[0].Data).To(Equal("xabc")) + }) + + It("returns nil if log does not exist", func() { + log := repository.FindLogs("x123", 1) + Expect(log).To(BeNil()) + }) + + It("updates the log when log with when log with same block number and index is already present", func() { + repository.CreateLogs([]core.Log{{ + BlockNumber: 1, + Index: 0, + Address: "x123", + TxHash: "x456", + Topics: map[int]string{0: "x777", 1: "x888", 2: "x999"}, + Data: "xABC", + }, + }) + repository.CreateLogs([]core.Log{{ + BlockNumber: 1, + Index: 0, + Address: "x123", + TxHash: "x456", + Topics: map[int]string{0: "x777", 1: "x888", 2: "x999"}, + Data: "xXYZ", + }, + }) + + log := repository.FindLogs("x123", 1) + Expect(log[0].Data).To(Equal("xXYZ")) + }) + + It("filters to the correct block number and address", func() { + repository.CreateLogs([]core.Log{{ + BlockNumber: 1, + Index: 0, + Address: "x123", + TxHash: "x456", + Topics: map[int]string{0: "x777", 1: "x888", 2: "x999"}, + Data: "xabc", + }}, + ) + repository.CreateLogs([]core.Log{{ + BlockNumber: 1, + Index: 1, + Address: "x123", + TxHash: "x789", + Topics: map[int]string{0: "x111", 1: "x222", 2: "x333"}, + Data: "xdef", + }}, + ) + repository.CreateLogs([]core.Log{{ + BlockNumber: 2, + Index: 0, + Address: "x123", + TxHash: "x456", + Topics: map[int]string{0: "x777", 1: "x888", 2: "x999"}, + Data: "xabc", + }}, + ) + + log := repository.FindLogs("x123", 1) + + type logIndex struct { + blockNumber int64 + Index int64 + } + var uniqueBlockNumbers []logIndex + for _, log := range log { + uniqueBlockNumbers = append(uniqueBlockNumbers, + logIndex{log.BlockNumber, log.Index}) + } + sort.Slice(uniqueBlockNumbers, func(i, j int) bool { + if uniqueBlockNumbers[i].blockNumber < uniqueBlockNumbers[j].blockNumber { + return true + } + if uniqueBlockNumbers[i].blockNumber > uniqueBlockNumbers[j].blockNumber { + return false + } + return uniqueBlockNumbers[i].Index < uniqueBlockNumbers[j].Index + }) + + Expect(log).NotTo(BeNil()) + Expect(len(log)).To(Equal(2)) + Expect(uniqueBlockNumbers).To(Equal( + []logIndex{ + {blockNumber: 1, Index: 0}, + {blockNumber: 1, Index: 1}}, + )) + }) + }) }