Save Logs to DB (#100)

* Save logs to database

* Save multiple logs to db

* Add block number parameter to `FindLogs`
This commit is contained in:
Matt K 2017-12-12 15:55:26 -06:00 committed by GitHub
parent 5e64283a12
commit a68f277066
13 changed files with 366 additions and 19 deletions

View File

@ -16,18 +16,22 @@ func main() {
contractHash := flag.String("contract-hash", "", "Contract hash to show summary")
_blockNumber := flag.Int64("block-number", -1, "Block number of summary")
flag.Parse()
config := cmd.LoadConfig(*environment)
blockchain := geth.NewGethBlockchain(config.Client.IPCPath)
repository := cmd.LoadPostgres(config.Database, blockchain.Node())
blockNumber := cmd.RequestedBlockNumber(_blockNumber)
logs, err := blockchain.GetLogs(core.Contract{Hash: *contractHash}, blockNumber)
if err != nil {
log.Fatalln(err)
}
repository.CreateLogs(logs)
for _, l := range logs {
fmt.Println("\tAddress: ", l.Address)
fmt.Println("\tTxHash: ", l.TxHash)
fmt.Println("\tBlockNumber ", l.BlockNumber)
fmt.Println("\tIndex ", l.Index)
fmt.Println("\tTopics: ")
for i, topic := range l.Topics {
fmt.Printf("\t\tTopic %d: %s\n", i, topic)

View File

@ -0,0 +1 @@
DROP TABLE logs;

View File

@ -0,0 +1,14 @@
CREATE TABLE logs (
id SERIAL PRIMARY KEY,
block_number BIGINT,
address VARCHAR(66),
tx_hash VARCHAR(66),
index BIGINT,
topic0 VARCHAR(66),
topic1 VARCHAR(66),
topic2 VARCHAR(66),
topic3 VARCHAR(66),
data TEXT,
CONSTRAINT log_uc UNIQUE (block_number, index)
);

View File

@ -73,6 +73,43 @@ CREATE SEQUENCE blocks_id_seq
ALTER SEQUENCE blocks_id_seq OWNED BY blocks.id;
--
-- Name: logs; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE logs (
id integer NOT NULL,
block_number bigint,
address character varying(66),
tx_hash character varying(66),
index bigint,
topic0 character varying(66),
topic1 character varying(66),
topic2 character varying(66),
topic3 character varying(66),
data text
);
--
-- Name: logs_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE logs_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE logs_id_seq OWNED BY logs.id;
--
-- Name: nodes; Type: TABLE; Schema: public; Owner: -
--
@ -186,6 +223,13 @@ ALTER SEQUENCE watched_contracts_contract_id_seq OWNED BY watched_contracts.cont
ALTER TABLE ONLY blocks ALTER COLUMN id SET DEFAULT nextval('blocks_id_seq'::regclass);
--
-- Name: logs id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY logs ALTER COLUMN id SET DEFAULT nextval('logs_id_seq'::regclass);
--
-- Name: nodes id; Type: DEFAULT; Schema: public; Owner: -
--
@ -223,6 +267,22 @@ ALTER TABLE ONLY watched_contracts
ADD CONSTRAINT contract_hash_uc UNIQUE (contract_hash);
--
-- Name: logs log_uc; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY logs
ADD CONSTRAINT log_uc UNIQUE (block_number, index);
--
-- Name: logs logs_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY logs
ADD CONSTRAINT logs_pkey PRIMARY KEY (id);
--
-- Name: nodes node_uc; Type: CONSTRAINT; Schema: public; Owner: -
--

View File

@ -108,12 +108,13 @@ var _ = Describe("Reading contracts", func() {
BlockNumber: 4703824,
TxHash: "0xf896bfd1eb539d881a1a31102b78de9f25cd591bf1fe1924b86148c0b205fd5d",
Address: "0xd26114cd6EE289AccF82350c8d8487fedB8A0C07",
Topics: []string{
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"0x000000000000000000000000fbb1b73c4f0bda4f67dca266ce6ef42f520fbb98",
"0x000000000000000000000000d26114cd6ee289accf82350c8d8487fedb8a0c07",
Topics: map[int]string{
0: "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
1: "0x000000000000000000000000fbb1b73c4f0bda4f67dca266ce6ef42f520fbb98",
2: "0x000000000000000000000000d26114cd6ee289accf82350c8d8487fedb8a0c07",
},
Data: "0x0000000000000000000000000000000000000000000000000c7d713b49da0000"}
Index: 19,
Data: "0x0000000000000000000000000000000000000000000000000c7d713b49da0000"}
config, _ := cfg.NewConfig("infura")
blockchain := geth.NewGethBlockchain(config.Client.IPCPath)
contract := testing.SampleContract()

View File

@ -4,6 +4,7 @@ type Log struct {
BlockNumber int64
TxHash string
Address string
Topics []string
Topics map[int]string
Index int64
Data string
}

View File

@ -8,9 +8,9 @@ import (
func GethLogToCoreLog(gethLog types.Log) core.Log {
topics := gethLog.Topics
var hexTopics []string
for _, topic := range topics {
hexTopics = append(hexTopics, topic.Hex())
var hexTopics = make(map[int]string)
for i, topic := range topics {
hexTopics[i] = topic.Hex()
}
return core.Log{
Address: gethLog.Address.Hex(),
@ -18,6 +18,7 @@ func GethLogToCoreLog(gethLog types.Log) core.Log {
BlockNumber: int64(gethLog.BlockNumber),
Topics: hexTopics,
TxHash: gethLog.TxHash.Hex(),
Index: int64(gethLog.Index),
Data: hexutil.Encode(gethLog.Data),
}
}

View File

@ -32,9 +32,10 @@ var _ = Describe("Conversion of GethLog to core.Log", func() {
BlockNumber: int64(gethLog.BlockNumber),
Data: hexutil.Encode(gethLog.Data),
TxHash: gethLog.TxHash.Hex(),
Topics: []string{
common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef").Hex(),
common.HexToHash("0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615").Hex(),
Index: 2,
Topics: map[int]string{
0: common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef").Hex(),
1: common.HexToHash("0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615").Hex(),
},
}
@ -43,6 +44,7 @@ var _ = Describe("Conversion of GethLog to core.Log", func() {
Expect(coreLog.Address).To(Equal(expected.Address))
Expect(coreLog.BlockNumber).To(Equal(expected.BlockNumber))
Expect(coreLog.Data).To(Equal(expected.Data))
Expect(coreLog.Index).To(Equal(expected.Index))
Expect(coreLog.Topics[0]).To(Equal(expected.Topics[0]))
Expect(coreLog.Topics[1]).To(Equal(expected.Topics[1]))
Expect(coreLog.TxHash).To(Equal(expected.TxHash))

View File

@ -1,12 +1,36 @@
package repositories
import (
"fmt"
"github.com/8thlight/vulcanizedb/pkg/core"
)
type InMemory struct {
blocks map[int64]*core.Block
contracts map[string]*core.Contract
logs map[string][]core.Log
}
func (repository *InMemory) CreateLogs(logs []core.Log) error {
for _, log := range logs {
key := fmt.Sprintf("%s%s", log.BlockNumber, log.Index)
var logs []core.Log
repository.logs[key] = append(logs, log)
}
return nil
}
func (repository *InMemory) FindLogs(address string, blockNumber int64) []core.Log {
var matchingLogs []core.Log
for _, logs := range repository.logs {
for _, log := range logs {
if log.Address == address && log.BlockNumber == blockNumber {
matchingLogs = append(matchingLogs, log)
}
}
}
return matchingLogs
}
func (repository *InMemory) CreateContract(contract core.Contract) error {
@ -48,6 +72,7 @@ func NewInMemory() *InMemory {
return &InMemory{
blocks: make(map[int64]*core.Block),
contracts: make(map[string]*core.Contract),
logs: make(map[string][]core.Log),
}
}

View File

@ -25,6 +25,52 @@ var (
ErrUnableToSetNode = errors.New("postgres: unable to set node")
)
func (repository Postgres) CreateLogs(logs []core.Log) error {
tx, _ := repository.Db.BeginTx(context.Background(), nil)
for _, tlog := range logs {
_, err := tx.Exec(
`INSERT INTO logs (block_number, address, tx_hash, index, topic0, topic1, topic2, topic3, data)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (index, block_number)
DO UPDATE
SET block_number = $1,
address = $2,
tx_hash = $3,
index = $4,
topic0 = $5,
topic1 = $6,
topic2 = $7,
topic3 = $8,
data = $9
`,
tlog.BlockNumber, tlog.Address, tlog.TxHash, tlog.Index, tlog.Topics[0], tlog.Topics[1], tlog.Topics[2], tlog.Topics[3], tlog.Data,
)
if err != nil {
tx.Rollback()
return ErrDBInsertFailed
}
}
tx.Commit()
return nil
}
func (repository Postgres) FindLogs(address string, blockNumber int64) []core.Log {
logRows, _ := repository.Db.Query(
`SELECT block_number,
address,
tx_hash,
index,
topic0,
topic1,
topic2,
topic3,
data
FROM logs
WHERE address = $1 AND block_number = $2
ORDER BY block_number DESC`, address, blockNumber)
return repository.loadLogs(logRows)
}
func NewPostgres(databaseConfig config.Database, node core.Node) (Postgres, error) {
connectString := config.DbConnectionString(databaseConfig)
db, err := sqlx.Connect("postgres", connectString)
@ -86,9 +132,9 @@ func (repository Postgres) ContractExists(contractHash string) bool {
func (repository Postgres) FindContract(contractHash string) *core.Contract {
var hash string
var abi string
row := repository.Db.QueryRow(
contract := repository.Db.QueryRow(
`SELECT contract_hash, contract_abi FROM watched_contracts WHERE contract_hash=$1`, contractHash)
err := row.Scan(&hash, &abi)
err := contract.Scan(&hash, &abi)
if err == sql.ErrNoRows {
return nil
}
@ -118,9 +164,19 @@ func (repository Postgres) MissingBlockNumbers(startingBlockNumber int64, highes
func (repository Postgres) FindBlockByNumber(blockNumber int64) *core.Block {
blockRows, _ := repository.Db.Query(
`SELECT id, block_number, block_gaslimit, block_gasused, block_time, block_difficulty, block_hash, block_nonce, block_parenthash, block_size, uncle_hash
FROM blocks
WHERE node_id = $1`, repository.nodeId)
`SELECT id,
block_number,
block_gaslimit,
block_gasused,
block_time,
block_difficulty,
block_hash,
block_nonce,
block_parenthash,
block_size,
uncle_hash
FROM blocks
WHERE node_id = $1`, repository.nodeId)
var savedBlocks []core.Block
for blockRows.Next() {
savedBlock := repository.loadBlock(blockRows)
@ -189,7 +245,16 @@ func (repository Postgres) loadBlock(blockRows *sql.Rows) core.Block {
var gasUsed float64
var uncleHash string
blockRows.Scan(&blockId, &blockNumber, &gasLimit, &gasUsed, &blockTime, &difficulty, &blockHash, &blockNonce, &blockParentHash, &blockSize, &uncleHash)
transactionRows, _ := repository.Db.Query(`SELECT tx_hash, tx_nonce, tx_to, tx_from, tx_gaslimit, tx_gasprice, tx_value FROM transactions WHERE block_id = $1`, blockId)
transactionRows, _ := repository.Db.Query(`
SELECT tx_hash,
tx_nonce,
tx_to,
tx_from,
tx_gaslimit,
tx_gasprice,
tx_value
FROM transactions
WHERE block_id = $1`, blockId)
transactions := repository.loadTransactions(transactionRows)
return core.Block{
Difficulty: difficulty,
@ -206,6 +271,32 @@ func (repository Postgres) loadBlock(blockRows *sql.Rows) core.Block {
}
}
func (repository Postgres) loadLogs(logsRows *sql.Rows) []core.Log {
var logs []core.Log
for logsRows.Next() {
var blockNumber int64
var address string
var txHash string
var index int64
var data string
topics := make([]string, 4)
logsRows.Scan(&blockNumber, &address, &txHash, &index, &topics[0], &topics[1], &topics[2], &topics[3], &data)
log := core.Log{
BlockNumber: blockNumber,
TxHash: txHash,
Address: address,
Index: index,
Data: data,
}
log.Topics = make(map[int]string)
for i, topic := range topics {
log.Topics[i] = topic
}
logs = append(logs, log)
}
return logs
}
func (repository Postgres) loadTransactions(transactionRows *sql.Rows) []core.Transaction {
var transactions []core.Transaction
for transactionRows.Next() {
@ -232,7 +323,17 @@ func (repository Postgres) loadTransactions(transactionRows *sql.Rows) []core.Tr
}
func (repository Postgres) addTransactions(contract core.Contract) core.Contract {
transactionRows, _ := repository.Db.Query(`SELECT tx_hash, tx_nonce, tx_to, tx_from, tx_gaslimit, tx_gasprice, tx_value FROM transactions WHERE tx_to = $1 ORDER BY block_id DESC`, contract.Hash)
transactionRows, _ := repository.Db.Query(`
SELECT tx_hash,
tx_nonce,
tx_to,
tx_from,
tx_gaslimit,
tx_gasprice,
tx_value
FROM transactions
WHERE tx_to = $1
ORDER BY block_id DESC`, contract.Hash)
transactions := repository.loadTransactions(transactionRows)
savedContract := core.Contract{Hash: contract.Hash, Transactions: transactions, Abi: contract.Abi}
return savedContract

View File

@ -65,6 +65,25 @@ var _ = Describe("Postgres repository", func() {
Expect(err).To(Equal(repositories.ErrUnableToSetNode))
})
It("does not commit log if log is invalid", func() {
//badTxHash violates db tx_hash field length
badTxHash := fmt.Sprintf("x %s", strings.Repeat("1", 100))
badLog := core.Log{
Address: "x123",
BlockNumber: 1,
TxHash: badTxHash,
}
cfg, _ := config.NewConfig("private")
node := core.Node{GenesisBlock: "GENESIS", NetworkId: 1}
repository, _ := repositories.NewPostgres(cfg.Database, node)
err := repository.CreateLogs([]core.Log{badLog})
savedBlock := repository.FindLogs("x123", 1)
Expect(err).ToNot(BeNil())
Expect(savedBlock).To(BeNil())
})
It("does not commit block or transactions if transaction is invalid", func() {
//badHash violates db To field length
badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100))

View File

@ -11,4 +11,6 @@ type Repository interface {
CreateContract(contract core.Contract) error
ContractExists(contractHash string) bool
FindContract(contractHash string) *core.Contract
CreateLogs(log []core.Log) error
FindLogs(address string, blockNumber int64) []core.Log
}

View File

@ -1,6 +1,8 @@
package testing
import (
"sort"
"github.com/8thlight/vulcanizedb/pkg/core"
"github.com/8thlight/vulcanizedb/pkg/repositories"
. "github.com/onsi/ginkgo"
@ -11,6 +13,7 @@ func ClearData(postgres repositories.Postgres) {
postgres.Db.MustExec("DELETE FROM watched_contracts")
postgres.Db.MustExec("DELETE FROM transactions")
postgres.Db.MustExec("DELETE FROM blocks")
postgres.Db.MustExec("DELETE FROM logs")
}
func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.Repository) {
@ -294,4 +297,117 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
})
})
Describe("Saving logs", func() {
It("returns the log when it exists", func() {
repository.CreateLogs([]core.Log{{
BlockNumber: 1,
Index: 0,
Address: "x123",
TxHash: "x456",
Topics: map[int]string{0: "x777", 1: "x888", 2: "x999"},
Data: "xabc",
}},
)
log := repository.FindLogs("x123", 1)
Expect(log).NotTo(BeNil())
Expect(log[0].BlockNumber).To(Equal(int64(1)))
Expect(log[0].Address).To(Equal("x123"))
Expect(log[0].Index).To(Equal(int64(0)))
Expect(log[0].TxHash).To(Equal("x456"))
Expect(log[0].Topics[0]).To(Equal("x777"))
Expect(log[0].Topics[1]).To(Equal("x888"))
Expect(log[0].Topics[2]).To(Equal("x999"))
Expect(log[0].Data).To(Equal("xabc"))
})
It("returns nil if log does not exist", func() {
log := repository.FindLogs("x123", 1)
Expect(log).To(BeNil())
})
It("updates the log when log with when log with same block number and index is already present", func() {
repository.CreateLogs([]core.Log{{
BlockNumber: 1,
Index: 0,
Address: "x123",
TxHash: "x456",
Topics: map[int]string{0: "x777", 1: "x888", 2: "x999"},
Data: "xABC",
},
})
repository.CreateLogs([]core.Log{{
BlockNumber: 1,
Index: 0,
Address: "x123",
TxHash: "x456",
Topics: map[int]string{0: "x777", 1: "x888", 2: "x999"},
Data: "xXYZ",
},
})
log := repository.FindLogs("x123", 1)
Expect(log[0].Data).To(Equal("xXYZ"))
})
It("filters to the correct block number and address", func() {
repository.CreateLogs([]core.Log{{
BlockNumber: 1,
Index: 0,
Address: "x123",
TxHash: "x456",
Topics: map[int]string{0: "x777", 1: "x888", 2: "x999"},
Data: "xabc",
}},
)
repository.CreateLogs([]core.Log{{
BlockNumber: 1,
Index: 1,
Address: "x123",
TxHash: "x789",
Topics: map[int]string{0: "x111", 1: "x222", 2: "x333"},
Data: "xdef",
}},
)
repository.CreateLogs([]core.Log{{
BlockNumber: 2,
Index: 0,
Address: "x123",
TxHash: "x456",
Topics: map[int]string{0: "x777", 1: "x888", 2: "x999"},
Data: "xabc",
}},
)
log := repository.FindLogs("x123", 1)
type logIndex struct {
blockNumber int64
Index int64
}
var uniqueBlockNumbers []logIndex
for _, log := range log {
uniqueBlockNumbers = append(uniqueBlockNumbers,
logIndex{log.BlockNumber, log.Index})
}
sort.Slice(uniqueBlockNumbers, func(i, j int) bool {
if uniqueBlockNumbers[i].blockNumber < uniqueBlockNumbers[j].blockNumber {
return true
}
if uniqueBlockNumbers[i].blockNumber > uniqueBlockNumbers[j].blockNumber {
return false
}
return uniqueBlockNumbers[i].Index < uniqueBlockNumbers[j].Index
})
Expect(log).NotTo(BeNil())
Expect(len(log)).To(Equal(2))
Expect(uniqueBlockNumbers).To(Equal(
[]logIndex{
{blockNumber: 1, Index: 0},
{blockNumber: 1, Index: 1}},
))
})
})
}