From c00b8a5a9805490e8ec0fd50ef3a887b270085c8 Mon Sep 17 00:00:00 2001 From: Matt K <1036969+mkrump@users.noreply.github.com> Date: Tue, 23 Jan 2018 12:43:35 -0600 Subject: [PATCH] Add Filters (#133) * Add LogFilter struct * Add log_filters table * Add view for events watching * Add cmd line "add_filter" to mimic eventual endpoint * Allow multiple filters in config --- .gitignore | 1 + Gododir/main.go | 23 +++- README.md | 13 +- cmd/add_filter/main.go | 38 ++++++ cmd/utils.go | 11 +- cmd/vulcanize_db/main.go | 10 +- .../1516648743_add_log_filters.down.sql | 1 + .../1516648743_add_log_filters.up.sql | 12 ++ ...1516653373_add_watched_event_logs.down.sql | 1 + .../1516653373_add_watched_event_logs.up.sql | 29 ++++ db/schema.sql | 102 +++++++++++++- filters/example-filter.json | 15 +++ pkg/filters/filter_query.go | 64 +++++++++ pkg/filters/filter_test.go | 125 ++++++++++++++++++ pkg/filters/query_builder_suite_test.go | 13 ++ pkg/repositories/in_memory.go | 22 ++- pkg/repositories/postgres.go | 15 ++- pkg/repositories/repository.go | 6 +- pkg/repositories/testing/helpers.go | 39 ++++++ 19 files changed, 514 insertions(+), 26 deletions(-) create mode 100644 cmd/add_filter/main.go create mode 100644 db/migrations/1516648743_add_log_filters.down.sql create mode 100644 db/migrations/1516648743_add_log_filters.up.sql create mode 100644 db/migrations/1516653373_add_watched_event_logs.down.sql create mode 100644 db/migrations/1516653373_add_watched_event_logs.up.sql create mode 100644 filters/example-filter.json create mode 100644 pkg/filters/filter_query.go create mode 100644 pkg/filters/filter_test.go create mode 100644 pkg/filters/query_builder_suite_test.go diff --git a/.gitignore b/.gitignore index ac6ee651..f5b5e5ae 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ environments/*.toml Vagrantfile vagrant_bootstrap.sh .vagrant +test_scripts/ diff --git a/Gododir/main.go b/Gododir/main.go index 6243f67f..15200298 100644 --- a/Gododir/main.go +++ b/Gododir/main.go @@ -28,13 +28,16 @@ func tasks(p *do.Project) { p.Task("vulcanizeDb", nil, func(context *do.Context) { environment := parseEnvironment(context) - context.Start(`go run main.go --environment={{.environment}}`, - do.M{"environment": environment, "$in": "cmd/vulcanize_db"}) + startingNumber := context.Args.MayInt(0, "starting-number", "s") + context.Start(`go run main.go --environment={{.environment}} --starting-number={{.startingNumber}}`, + do.M{"environment": environment, + "startingNumber": startingNumber, + "$in": "cmd/vulcanize_db"}) }) p.Task("populateBlocks", nil, func(context *do.Context) { environment := parseEnvironment(context) - startingNumber := context.Args.MayInt(-1, "starting-number") + startingNumber := context.Args.MayInt(-1, "starting-number", "s") if startingNumber < 0 { log.Fatalln("--starting-number required") } @@ -42,6 +45,20 @@ func tasks(p *do.Project) { do.M{"environment": environment, "startingNumber": startingNumber, "$in": "cmd/populate_blocks"}) }) + p.Task("watchEvent", nil, func(context *do.Context) { + environment := parseEnvironment(context) + filterFilePath := context.Args.MayString("", "filter-filepath", "f") + if filterFilePath == "" { + log.Fatalln("--filter-filepath required") + } + context.Start(`go run main.go --environment={{.environment}} --filter-filepath={{.filterFilePath}}`, + do.M{ + "environment": environment, + "filterFilePath": filterFilePath, + "$in": "cmd/add_filter", + }) + }) + p.Task("watchContract", nil, func(context *do.Context) { environment := parseEnvironment(context) contractHash := context.Args.MayString("", "contract-hash", "c") diff --git a/README.md b/README.md index e05adda6..b8dc09e3 100644 --- a/README.md +++ b/README.md @@ -60,19 +60,26 @@ The default location for Ethereum is: **Note the location of the ipc file is outputted when you connect to a blockchain. It is needed to for configuration** ## Start Vulcanize DB -1. Start a blockchain. +1. Start geth 2. In a separate terminal start vulcanize_db - `godo vulcanizeDb -- --environment=` +## Watch contract events +1. Start geth +2. In a separate terminal start vulcanize_db + - `godo vulcanizeDb -- --environment=` +3. Create event filter + - `godo watchEvent -- --environment= --filter-filepath=` + ## Running Listener -1. Start a blockchain. +1. Start geth 2. In a separate terminal start listener (ipcDir location) - `godo run -- --environment=` ## Retrieving Historical Data -1. Start a blockchain. +1. Start geth 2. In a separate terminal start listener (ipcDir location) - `godo populateBlocks -- --environment= --starting-number=` diff --git a/cmd/add_filter/main.go b/cmd/add_filter/main.go new file mode 100644 index 00000000..026c0854 --- /dev/null +++ b/cmd/add_filter/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "encoding/json" + "flag" + "io/ioutil" + "log" + + "github.com/8thlight/vulcanizedb/cmd" + "github.com/8thlight/vulcanizedb/pkg/filters" + "github.com/8thlight/vulcanizedb/pkg/geth" +) + +func main() { + environment := flag.String("environment", "", "Environment name") + filterFilePath := flag.String("filter-filepath", "", "path/to/filter.json") + + flag.Parse() + var logFilters filters.LogFilters + config := cmd.LoadConfig(*environment) + blockchain := geth.NewBlockchain(config.Client.IPCPath) + repository := cmd.LoadPostgres(config.Database, blockchain.Node()) + absFilePath := cmd.AbsFilePath(*filterFilePath) + logFilterBytes, err := ioutil.ReadFile(absFilePath) + if err != nil { + log.Fatal(err) + } + err = json.Unmarshal(logFilterBytes, &logFilters) + if err != nil { + log.Fatal(err) + } + for _, filter := range logFilters { + err = repository.AddFilter(filter) + if err != nil { + log.Fatal(err) + } + } +} diff --git a/cmd/utils.go b/cmd/utils.go index c7e5f8db..03a6be65 100644 --- a/cmd/utils.go +++ b/cmd/utils.go @@ -30,9 +30,7 @@ func LoadPostgres(database config.Database, node core.Node) repositories.Postgre } func ReadAbiFile(abiFilepath string) string { - if !filepath.IsAbs(abiFilepath) { - abiFilepath = filepath.Join(config.ProjectRoot(), abiFilepath) - } + abiFilepath = AbsFilePath(abiFilepath) abi, err := geth.ReadAbiFile(abiFilepath) if err != nil { log.Fatalf("Error reading ABI file at \"%s\"\n %v", abiFilepath, err) @@ -40,6 +38,13 @@ func ReadAbiFile(abiFilepath string) string { return abi } +func AbsFilePath(filePath string) string { + if !filepath.IsAbs(filePath) { + filePath = filepath.Join(config.ProjectRoot(), filePath) + } + return filePath +} + func GetAbi(abiFilepath string, contractHash string, network string) string { var contractAbiString string if abiFilepath != "" { diff --git a/cmd/vulcanize_db/main.go b/cmd/vulcanize_db/main.go index b74be6b5..24cf0776 100644 --- a/cmd/vulcanize_db/main.go +++ b/cmd/vulcanize_db/main.go @@ -18,14 +18,15 @@ const ( pollingInterval = 7 * time.Second ) -func backFillAllBlocks(blockchain core.Blockchain, repository repositories.Postgres, missingBlocksPopulated chan int) { +func backFillAllBlocks(blockchain core.Blockchain, repository repositories.Postgres, missingBlocksPopulated chan int, startingBlockNumber int64) { go func() { - missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, repository, 0) + missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, repository, startingBlockNumber) }() } func main() { environment := flag.String("environment", "", "Environment name") + startingBlockNumber := flag.Int("starting-number", 0, "First block to fill from") flag.Parse() ticker := time.NewTicker(pollingInterval) @@ -37,7 +38,8 @@ func main() { validator := history.NewBlockValidator(blockchain, repository, 15) missingBlocksPopulated := make(chan int) - go backFillAllBlocks(blockchain, repository, missingBlocksPopulated) + _startingBlockNumber := int64(*startingBlockNumber) + go backFillAllBlocks(blockchain, repository, missingBlocksPopulated, _startingBlockNumber) for { select { @@ -45,7 +47,7 @@ func main() { window := validator.ValidateBlocks() validator.Log(os.Stdout, window) case <-missingBlocksPopulated: - go backFillAllBlocks(blockchain, repository, missingBlocksPopulated) + go backFillAllBlocks(blockchain, repository, missingBlocksPopulated, _startingBlockNumber) } } } diff --git a/db/migrations/1516648743_add_log_filters.down.sql b/db/migrations/1516648743_add_log_filters.down.sql new file mode 100644 index 00000000..a28adeee --- /dev/null +++ b/db/migrations/1516648743_add_log_filters.down.sql @@ -0,0 +1 @@ +DROP TABLE log_filters; \ No newline at end of file diff --git a/db/migrations/1516648743_add_log_filters.up.sql b/db/migrations/1516648743_add_log_filters.up.sql new file mode 100644 index 00000000..100a5718 --- /dev/null +++ b/db/migrations/1516648743_add_log_filters.up.sql @@ -0,0 +1,12 @@ +CREATE TABLE log_filters ( + id SERIAL, + name VARCHAR NOT NULL CHECK (name <> ''), + from_block BIGINT CHECK (from_block >= 0), + to_block BIGINT CHECK (from_block >= 0), + address VARCHAR(66), + topic0 VARCHAR(66), + topic1 VARCHAR(66), + topic2 VARCHAR(66), + topic3 VARCHAR(66), + CONSTRAINT name_uc UNIQUE (name) +); \ No newline at end of file diff --git a/db/migrations/1516653373_add_watched_event_logs.down.sql b/db/migrations/1516653373_add_watched_event_logs.down.sql new file mode 100644 index 00000000..14bb2987 --- /dev/null +++ b/db/migrations/1516653373_add_watched_event_logs.down.sql @@ -0,0 +1 @@ +DROP VIEW watched_event_logs; \ No newline at end of file diff --git a/db/migrations/1516653373_add_watched_event_logs.up.sql b/db/migrations/1516653373_add_watched_event_logs.up.sql new file mode 100644 index 00000000..3bba916f --- /dev/null +++ b/db/migrations/1516653373_add_watched_event_logs.up.sql @@ -0,0 +1,29 @@ +CREATE VIEW block_stats AS + SELECT + max(block_number) AS max_block, + min(block_number) AS min_block + FROM logs; + +CREATE VIEW watched_event_logs AS + SELECT + log_filters.name, + logs.id, + block_number, + logs.address, + tx_hash, + index, + logs.topic0, + logs.topic1, + logs.topic2, + logs.topic3, + data, + receipt_id + FROM log_filters + CROSS JOIN block_stats + JOIN logs ON logs.address = log_filters.address + AND logs.block_number >= coalesce(log_filters.from_block, block_stats.min_block) + AND logs.block_number <= coalesce(log_filters.to_block, block_stats.max_block) + WHERE (log_filters.topic0 = logs.topic0 OR log_filters.topic0 ISNULL) + AND (log_filters.topic1 = logs.topic1 OR log_filters.topic1 ISNULL) + AND (log_filters.topic2 = logs.topic2 OR log_filters.topic2 ISNULL) + AND (log_filters.topic3 = logs.topic3 OR log_filters.topic3 ISNULL); \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index e2e1ece3..9ba46900 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -34,6 +34,35 @@ SET default_tablespace = ''; SET default_with_oids = false; +-- +-- Name: logs; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE logs ( + id integer NOT NULL, + block_number bigint, + address character varying(66), + tx_hash character varying(66), + index bigint, + topic0 character varying(66), + topic1 character varying(66), + topic2 character varying(66), + topic3 character varying(66), + data text, + receipt_id integer +); + + +-- +-- Name: block_stats; Type: VIEW; Schema: public; Owner: - +-- + +CREATE VIEW block_stats AS + SELECT max(logs.block_number) AS max_block, + min(logs.block_number) AS min_block + FROM logs; + + -- -- Name: blocks; Type: TABLE; Schema: public; Owner: - -- @@ -80,24 +109,45 @@ ALTER SEQUENCE blocks_id_seq OWNED BY blocks.id; -- --- Name: logs; Type: TABLE; Schema: public; Owner: - +-- Name: log_filters; Type: TABLE; Schema: public; Owner: - -- -CREATE TABLE logs ( +CREATE TABLE log_filters ( id integer NOT NULL, - block_number bigint, + name character varying NOT NULL, + from_block bigint, + to_block bigint, address character varying(66), - tx_hash character varying(66), - index bigint, topic0 character varying(66), topic1 character varying(66), topic2 character varying(66), topic3 character varying(66), - data text, - receipt_id integer + CONSTRAINT log_filters_from_block_check CHECK ((from_block >= 0)), + CONSTRAINT log_filters_from_block_check1 CHECK ((from_block >= 0)), + CONSTRAINT log_filters_name_check CHECK (((name)::text <> ''::text)) ); +-- +-- Name: log_filters_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE log_filters_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: log_filters_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE log_filters_id_seq OWNED BY log_filters.id; + + -- -- Name: logs_id_seq; Type: SEQUENCE; Schema: public; Owner: - -- @@ -266,6 +316,29 @@ CREATE SEQUENCE watched_contracts_contract_id_seq ALTER SEQUENCE watched_contracts_contract_id_seq OWNED BY watched_contracts.contract_id; +-- +-- Name: watched_event_logs; Type: VIEW; Schema: public; Owner: - +-- + +CREATE VIEW watched_event_logs AS + SELECT log_filters.name, + logs.id, + logs.block_number, + logs.address, + logs.tx_hash, + logs.index, + logs.topic0, + logs.topic1, + logs.topic2, + logs.topic3, + logs.data, + logs.receipt_id + FROM ((log_filters + CROSS JOIN block_stats) + JOIN logs ON ((((logs.address)::text = (log_filters.address)::text) AND (logs.block_number >= COALESCE(log_filters.from_block, block_stats.min_block)) AND (logs.block_number <= COALESCE(log_filters.to_block, block_stats.max_block))))) + WHERE ((((log_filters.topic0)::text = (logs.topic0)::text) OR (log_filters.topic0 IS NULL)) AND (((log_filters.topic1)::text = (logs.topic1)::text) OR (log_filters.topic1 IS NULL)) AND (((log_filters.topic2)::text = (logs.topic2)::text) OR (log_filters.topic2 IS NULL)) AND (((log_filters.topic3)::text = (logs.topic3)::text) OR (log_filters.topic3 IS NULL))); + + -- -- Name: blocks id; Type: DEFAULT; Schema: public; Owner: - -- @@ -273,6 +346,13 @@ ALTER SEQUENCE watched_contracts_contract_id_seq OWNED BY watched_contracts.cont ALTER TABLE ONLY blocks ALTER COLUMN id SET DEFAULT nextval('blocks_id_seq'::regclass); +-- +-- Name: log_filters id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY log_filters ALTER COLUMN id SET DEFAULT nextval('log_filters_id_seq'::regclass); + + -- -- Name: logs id; Type: DEFAULT; Schema: public; Owner: - -- @@ -332,6 +412,14 @@ ALTER TABLE ONLY logs ADD CONSTRAINT logs_pkey PRIMARY KEY (id); +-- +-- Name: log_filters name_uc; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY log_filters + ADD CONSTRAINT name_uc UNIQUE (name); + + -- -- Name: blocks node_id_block_number_uc; Type: CONSTRAINT; Schema: public; Owner: - -- diff --git a/filters/example-filter.json b/filters/example-filter.json new file mode 100644 index 00000000..b123b0eb --- /dev/null +++ b/filters/example-filter.json @@ -0,0 +1,15 @@ +[{ +"name": "TransferFilter", +"fromBlock": "0x488290", +"toBlock": "0x488678", +"address": "0x06012c8cf97bead5deae237070f9587f8e7a266d", +"topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"] +}, +{ + "name": "NewFilter", + "toBlock": "0x4B34AA", + "fromBlock": "0x4B34AD", + "address": "0x06012c8cf97bead5deae237070f9587f8e7a266d", + "topics": ["0x241ea03ca20251805084d27d4440371c34a0b85ff108f6bb5611248f73818b80"] +}] + diff --git a/pkg/filters/filter_query.go b/pkg/filters/filter_query.go new file mode 100644 index 00000000..0fb4e8bd --- /dev/null +++ b/pkg/filters/filter_query.go @@ -0,0 +1,64 @@ +package filters + +import ( + "encoding/json" + + "errors" + + "github.com/8thlight/vulcanizedb/pkg/core" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" +) + +type LogFilters []LogFilter + +type LogFilter struct { + Name string `json:"name"` + FromBlock int64 `json:"fromBlock"` + ToBlock int64 `json:"toBlock"` + Address string `json:"address"` + core.Topics `json:"topics"` +} + +func (filterQuery *LogFilter) UnmarshalJSON(input []byte) error { + type Alias LogFilter + + var err error + aux := &struct { + ToBlock string `json:"toBlock"` + FromBlock string `json:"fromBlock"` + *Alias + }{ + Alias: (*Alias)(filterQuery), + } + if err := json.Unmarshal(input, &aux); err != nil { + return err + } + if filterQuery.Name == "" { + return errors.New("filters: must provide name for logfilter") + } + filterQuery.ToBlock, err = filterQuery.unmarshalFromToBlock(aux.ToBlock) + if err != nil { + return errors.New("filters: invalid fromBlock") + } + filterQuery.FromBlock, err = filterQuery.unmarshalFromToBlock(aux.FromBlock) + if err != nil { + return errors.New("filters: invalid fromBlock") + } + if !common.IsHexAddress(filterQuery.Address) { + return errors.New("filters: invalid address") + } + + return nil +} + +func (filterQuery *LogFilter) unmarshalFromToBlock(auxBlock string) (int64, error) { + if auxBlock == "" { + return -1, nil + } + block, err := hexutil.DecodeUint64(auxBlock) + if err != nil { + return 0, errors.New("filters: invalid block arg") + } + return int64(block), nil +} diff --git a/pkg/filters/filter_test.go b/pkg/filters/filter_test.go new file mode 100644 index 00000000..c2978771 --- /dev/null +++ b/pkg/filters/filter_test.go @@ -0,0 +1,125 @@ +package filters_test + +import ( + "encoding/json" + + "github.com/8thlight/vulcanizedb/pkg/core" + "github.com/8thlight/vulcanizedb/pkg/filters" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Log filters", func() { + It("decodes web3 filter to LogFilter", func() { + + var logFilter filters.LogFilter + jsonFilter := []byte( + `{ + "name": "TestEvent", + "fromBlock": "0x1", + "toBlock": "0x488290", + "address": "0x8888f1f195afa192cfee860698584c030f4c9db1", + "topics": ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null, "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null] + }`) + err := json.Unmarshal(jsonFilter, &logFilter) + + Expect(err).ToNot(HaveOccurred()) + Expect(logFilter.Name).To(Equal("TestEvent")) + Expect(logFilter.FromBlock).To(Equal(int64(1))) + Expect(logFilter.ToBlock).To(Equal(int64(4752016))) + Expect(logFilter.Address).To(Equal("0x8888f1f195afa192cfee860698584c030f4c9db1")) + Expect(logFilter.Topics).To(Equal( + core.Topics{ + "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", + "", + "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", + ""})) + }) + + It("decodes array of web3 filters to []LogFilter", func() { + + logFilters := make([]filters.LogFilter, 0) + jsonFilter := []byte( + `[{ + "name": "TestEvent", + "fromBlock": "0x1", + "toBlock": "0x488290", + "address": "0x8888f1f195afa192cfee860698584c030f4c9db1", + "topics": ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null, "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null] + }, + { + "name": "TestEvent2", + "fromBlock": "0x3", + "toBlock": "0x4", + "address": "0xd26114cd6EE289AccF82350c8d8487fedB8A0C07", + "topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", "0x0000000000000000000000006b0949d4c6edfe467db78241b7d5566f3c2bb43e", "0x0000000000000000000000005e44c3e467a49c9ca0296a9f130fc433041aaa28"] + }]`) + err := json.Unmarshal(jsonFilter, &logFilters) + + Expect(err).ToNot(HaveOccurred()) + Expect(len(logFilters)).To(Equal(2)) + Expect(logFilters[0].Name).To(Equal("TestEvent")) + Expect(logFilters[1].Name).To(Equal("TestEvent2")) + }) + + It("requires valid ethereum address", func() { + + var logFilter filters.LogFilter + jsonFilter := []byte( + `{ + "name": "TestEvent", + "fromBlock": "0x1", + "toBlock": "0x2", + "address": "0x8888f1f195afa192cf84c030f4c9db1", + "topics": ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null, "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null] + }`) + err := json.Unmarshal(jsonFilter, &logFilter) + Expect(err).To(HaveOccurred()) + + }) + It("requires name", func() { + + var logFilter filters.LogFilter + jsonFilter := []byte( + `{ + "fromBlock": "0x1", + "toBlock": "0x2", + "address": "0x8888f1f195afa192cfee860698584c030f4c9db1", + "topics": ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null, "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null] + }`) + err := json.Unmarshal(jsonFilter, &logFilter) + Expect(err).To(HaveOccurred()) + + }) + + It("maps missing fromBlock to -1", func() { + + var logFilter filters.LogFilter + jsonFilter := []byte( + `{ + "name": "TestEvent", + "toBlock": "0x2", + "address": "0x8888f1f195afa192cfee860698584c030f4c9db1", + "topics": ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null, "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null] + }`) + err := json.Unmarshal(jsonFilter, &logFilter) + Expect(err).ToNot(HaveOccurred()) + Expect(logFilter.FromBlock).To(Equal(int64(-1))) + + }) + + It("maps missing toBlock to -1", func() { + var logFilter filters.LogFilter + jsonFilter := []byte( + `{ + "name": "TestEvent", + "address": "0x8888f1f195afa192cfee860698584c030f4c9db1", + "topics": ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null, "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null] + }`) + err := json.Unmarshal(jsonFilter, &logFilter) + Expect(err).ToNot(HaveOccurred()) + Expect(logFilter.ToBlock).To(Equal(int64(-1))) + + }) + +}) diff --git a/pkg/filters/query_builder_suite_test.go b/pkg/filters/query_builder_suite_test.go new file mode 100644 index 00000000..42029812 --- /dev/null +++ b/pkg/filters/query_builder_suite_test.go @@ -0,0 +1,13 @@ +package filters_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestQueryBuilder(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "QueryBuilder Suite") +} diff --git a/pkg/repositories/in_memory.go b/pkg/repositories/in_memory.go index 0a609231..838cdefc 100644 --- a/pkg/repositories/in_memory.go +++ b/pkg/repositories/in_memory.go @@ -3,7 +3,10 @@ package repositories import ( "fmt" + "errors" + "github.com/8thlight/vulcanizedb/pkg/core" + "github.com/8thlight/vulcanizedb/pkg/filters" ) type InMemory struct { @@ -11,16 +14,27 @@ type InMemory struct { receipts map[string]core.Receipt contracts map[string]core.Contract logs map[string][]core.Log + logFilters map[string]filters.LogFilter CreateOrUpdateBlockCallCount int } +func (repository *InMemory) AddFilter(filter filters.LogFilter) error { + key := filter.Name + if _, ok := repository.logFilters[key]; ok || key == "" { + return errors.New("filter name not unique") + } + repository.logFilters[key] = filter + return nil +} + func NewInMemory() *InMemory { return &InMemory{ CreateOrUpdateBlockCallCount: 0, - blocks: make(map[int64]core.Block), - receipts: make(map[string]core.Receipt), - contracts: make(map[string]core.Contract), - logs: make(map[string][]core.Log), + blocks: make(map[int64]core.Block), + receipts: make(map[string]core.Receipt), + contracts: make(map[string]core.Contract), + logs: make(map[string][]core.Log), + logFilters: make(map[string]filters.LogFilter), } } diff --git a/pkg/repositories/postgres.go b/pkg/repositories/postgres.go index fdc92f74..11fa5f23 100644 --- a/pkg/repositories/postgres.go +++ b/pkg/repositories/postgres.go @@ -11,6 +11,7 @@ import ( "github.com/8thlight/vulcanizedb/pkg/config" "github.com/8thlight/vulcanizedb/pkg/core" + "github.com/8thlight/vulcanizedb/pkg/filters" "github.com/jmoiron/sqlx" _ "github.com/lib/pq" ) @@ -303,7 +304,7 @@ func (repository Postgres) createTransaction(tx *sql.Tx, blockId int64, transact err := tx.QueryRow( `INSERT INTO transactions (block_id, tx_hash, tx_nonce, tx_to, tx_from, tx_gaslimit, tx_gasprice, tx_value, tx_input_data) - VALUES ($1, $2, $3, $4, $5, $6, $7, cast(NULLIF($8, '') as NUMERIC), $9) + VALUES ($1, $2, $3, $4, $5, $6, $7, cast(NULLIF($8, '') AS NUMERIC), $9) RETURNING id`, blockId, transaction.Hash, transaction.Nonce, transaction.To, transaction.From, transaction.GasLimit, transaction.GasPrice, transaction.Value, transaction.Data). Scan(&transactionId) @@ -381,6 +382,18 @@ func (repository Postgres) CreateLogs(logs []core.Log) error { return nil } +func (repository Postgres) AddFilter(query filters.LogFilter) error { + _, err := repository.Db.Exec( + `INSERT INTO log_filters + (name, from_block, to_block, address, topic0, topic1, topic2, topic3) + VALUES ($1, NULLIF($2, -1), NULLIF($3, -1), $4, NULLIF($5, ''), NULLIF($6, ''), NULLIF($7, ''), NULLIF($8, ''))`, + query.Name, query.FromBlock, query.ToBlock, query.Address, query.Topics[0], query.Topics[1], query.Topics[2], query.Topics[3]) + if err != nil { + return err + } + return nil +} + func loadReceipt(receiptsRow *sql.Row) (core.Receipt, error) { var contractAddress string var txHash string diff --git a/pkg/repositories/repository.go b/pkg/repositories/repository.go index df9a8172..7e4b7cc4 100644 --- a/pkg/repositories/repository.go +++ b/pkg/repositories/repository.go @@ -1,6 +1,9 @@ package repositories -import "github.com/8thlight/vulcanizedb/pkg/core" +import ( + "github.com/8thlight/vulcanizedb/pkg/core" + "github.com/8thlight/vulcanizedb/pkg/filters" +) const ( blocksFromHeadBeforeFinal = 20 @@ -19,4 +22,5 @@ type Repository interface { CreateLogs(log []core.Log) error FindLogs(address string, blockNumber int64) []core.Log SetBlocksStatus(chainHead int64) + AddFilter(filter filters.LogFilter) error } diff --git a/pkg/repositories/testing/helpers.go b/pkg/repositories/testing/helpers.go index c258f041..ed8ea710 100644 --- a/pkg/repositories/testing/helpers.go +++ b/pkg/repositories/testing/helpers.go @@ -7,6 +7,7 @@ import ( "math/big" "github.com/8thlight/vulcanizedb/pkg/core" + "github.com/8thlight/vulcanizedb/pkg/filters" "github.com/8thlight/vulcanizedb/pkg/repositories" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -18,6 +19,7 @@ func ClearData(postgres repositories.Postgres) { postgres.Db.MustExec("DELETE FROM blocks") postgres.Db.MustExec("DELETE FROM logs") postgres.Db.MustExec("DELETE FROM receipts") + postgres.Db.MustExec("DELETE FROM log_filters") } func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.Repository) { @@ -600,6 +602,43 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories. Expect(err).To(HaveOccurred()) Expect(receipt).To(BeZero()) }) + }) + Describe("LogFilter", func() { + + It("inserts filter into watched events", func() { + + logFilter := filters.LogFilter{ + Name: "TestFilter", + FromBlock: 1, + ToBlock: 2, + Address: "0x8888f1f195afa192cfee860698584c030f4c9db1", + Topics: core.Topics{ + "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", + "", + "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", + "", + }, + } + err := repository.AddFilter(logFilter) + Expect(err).ToNot(HaveOccurred()) + }) + + It("returns error if name is not provided", func() { + + logFilter := filters.LogFilter{ + FromBlock: 1, + ToBlock: 2, + Address: "0x8888f1f195afa192cfee860698584c030f4c9db1", + Topics: core.Topics{ + "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", + "", + "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", + "", + }, + } + err := repository.AddFilter(logFilter) + Expect(err).To(HaveOccurred()) + }) }) }