Add Filters (#133)

* Add LogFilter struct

* Add log_filters table

* Add view for events watching

* Add cmd line "add_filter" to mimic eventual endpoint

* Allow multiple filters in config
This commit is contained in:
Matt K 2018-01-23 12:43:35 -06:00 committed by GitHub
parent 3f06c7374b
commit c00b8a5a98
19 changed files with 514 additions and 26 deletions

1
.gitignore vendored
View File

@ -7,3 +7,4 @@ environments/*.toml
Vagrantfile Vagrantfile
vagrant_bootstrap.sh vagrant_bootstrap.sh
.vagrant .vagrant
test_scripts/

View File

@ -28,13 +28,16 @@ func tasks(p *do.Project) {
p.Task("vulcanizeDb", nil, func(context *do.Context) { p.Task("vulcanizeDb", nil, func(context *do.Context) {
environment := parseEnvironment(context) environment := parseEnvironment(context)
context.Start(`go run main.go --environment={{.environment}}`, startingNumber := context.Args.MayInt(0, "starting-number", "s")
do.M{"environment": environment, "$in": "cmd/vulcanize_db"}) context.Start(`go run main.go --environment={{.environment}} --starting-number={{.startingNumber}}`,
do.M{"environment": environment,
"startingNumber": startingNumber,
"$in": "cmd/vulcanize_db"})
}) })
p.Task("populateBlocks", nil, func(context *do.Context) { p.Task("populateBlocks", nil, func(context *do.Context) {
environment := parseEnvironment(context) environment := parseEnvironment(context)
startingNumber := context.Args.MayInt(-1, "starting-number") startingNumber := context.Args.MayInt(-1, "starting-number", "s")
if startingNumber < 0 { if startingNumber < 0 {
log.Fatalln("--starting-number required") log.Fatalln("--starting-number required")
} }
@ -42,6 +45,20 @@ func tasks(p *do.Project) {
do.M{"environment": environment, "startingNumber": startingNumber, "$in": "cmd/populate_blocks"}) do.M{"environment": environment, "startingNumber": startingNumber, "$in": "cmd/populate_blocks"})
}) })
p.Task("watchEvent", nil, func(context *do.Context) {
environment := parseEnvironment(context)
filterFilePath := context.Args.MayString("", "filter-filepath", "f")
if filterFilePath == "" {
log.Fatalln("--filter-filepath required")
}
context.Start(`go run main.go --environment={{.environment}} --filter-filepath={{.filterFilePath}}`,
do.M{
"environment": environment,
"filterFilePath": filterFilePath,
"$in": "cmd/add_filter",
})
})
p.Task("watchContract", nil, func(context *do.Context) { p.Task("watchContract", nil, func(context *do.Context) {
environment := parseEnvironment(context) environment := parseEnvironment(context)
contractHash := context.Args.MayString("", "contract-hash", "c") contractHash := context.Args.MayString("", "contract-hash", "c")

View File

@ -60,19 +60,26 @@ The default location for Ethereum is:
**Note the location of the ipc file is outputted when you connect to a blockchain. It is needed to for configuration** **Note the location of the ipc file is outputted when you connect to a blockchain. It is needed to for configuration**
## Start Vulcanize DB ## Start Vulcanize DB
1. Start a blockchain. 1. Start geth
2. In a separate terminal start vulcanize_db 2. In a separate terminal start vulcanize_db
- `godo vulcanizeDb -- --environment=<some-environment>` - `godo vulcanizeDb -- --environment=<some-environment>`
## Watch contract events
1. Start geth
2. In a separate terminal start vulcanize_db
- `godo vulcanizeDb -- --environment=<some-environment>`
3. Create event filter
- `godo watchEvent -- --environment=<some-environment> --filter-filepath=<filters/example-filer.json>`
## Running Listener ## Running Listener
1. Start a blockchain. 1. Start geth
2. In a separate terminal start listener (ipcDir location) 2. In a separate terminal start listener (ipcDir location)
- `godo run -- --environment=<some-environment>` - `godo run -- --environment=<some-environment>`
## Retrieving Historical Data ## Retrieving Historical Data
1. Start a blockchain. 1. Start geth
2. In a separate terminal start listener (ipcDir location) 2. In a separate terminal start listener (ipcDir location)
- `godo populateBlocks -- --environment=<some-environment> --starting-number=<starting-block-number>` - `godo populateBlocks -- --environment=<some-environment> --starting-number=<starting-block-number>`

38
cmd/add_filter/main.go Normal file
View File

@ -0,0 +1,38 @@
package main
import (
"encoding/json"
"flag"
"io/ioutil"
"log"
"github.com/8thlight/vulcanizedb/cmd"
"github.com/8thlight/vulcanizedb/pkg/filters"
"github.com/8thlight/vulcanizedb/pkg/geth"
)
func main() {
environment := flag.String("environment", "", "Environment name")
filterFilePath := flag.String("filter-filepath", "", "path/to/filter.json")
flag.Parse()
var logFilters filters.LogFilters
config := cmd.LoadConfig(*environment)
blockchain := geth.NewBlockchain(config.Client.IPCPath)
repository := cmd.LoadPostgres(config.Database, blockchain.Node())
absFilePath := cmd.AbsFilePath(*filterFilePath)
logFilterBytes, err := ioutil.ReadFile(absFilePath)
if err != nil {
log.Fatal(err)
}
err = json.Unmarshal(logFilterBytes, &logFilters)
if err != nil {
log.Fatal(err)
}
for _, filter := range logFilters {
err = repository.AddFilter(filter)
if err != nil {
log.Fatal(err)
}
}
}

View File

@ -30,9 +30,7 @@ func LoadPostgres(database config.Database, node core.Node) repositories.Postgre
} }
func ReadAbiFile(abiFilepath string) string { func ReadAbiFile(abiFilepath string) string {
if !filepath.IsAbs(abiFilepath) { abiFilepath = AbsFilePath(abiFilepath)
abiFilepath = filepath.Join(config.ProjectRoot(), abiFilepath)
}
abi, err := geth.ReadAbiFile(abiFilepath) abi, err := geth.ReadAbiFile(abiFilepath)
if err != nil { if err != nil {
log.Fatalf("Error reading ABI file at \"%s\"\n %v", abiFilepath, err) log.Fatalf("Error reading ABI file at \"%s\"\n %v", abiFilepath, err)
@ -40,6 +38,13 @@ func ReadAbiFile(abiFilepath string) string {
return abi return abi
} }
func AbsFilePath(filePath string) string {
if !filepath.IsAbs(filePath) {
filePath = filepath.Join(config.ProjectRoot(), filePath)
}
return filePath
}
func GetAbi(abiFilepath string, contractHash string, network string) string { func GetAbi(abiFilepath string, contractHash string, network string) string {
var contractAbiString string var contractAbiString string
if abiFilepath != "" { if abiFilepath != "" {

View File

@ -18,14 +18,15 @@ const (
pollingInterval = 7 * time.Second pollingInterval = 7 * time.Second
) )
func backFillAllBlocks(blockchain core.Blockchain, repository repositories.Postgres, missingBlocksPopulated chan int) { func backFillAllBlocks(blockchain core.Blockchain, repository repositories.Postgres, missingBlocksPopulated chan int, startingBlockNumber int64) {
go func() { go func() {
missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, repository, 0) missingBlocksPopulated <- history.PopulateMissingBlocks(blockchain, repository, startingBlockNumber)
}() }()
} }
func main() { func main() {
environment := flag.String("environment", "", "Environment name") environment := flag.String("environment", "", "Environment name")
startingBlockNumber := flag.Int("starting-number", 0, "First block to fill from")
flag.Parse() flag.Parse()
ticker := time.NewTicker(pollingInterval) ticker := time.NewTicker(pollingInterval)
@ -37,7 +38,8 @@ func main() {
validator := history.NewBlockValidator(blockchain, repository, 15) validator := history.NewBlockValidator(blockchain, repository, 15)
missingBlocksPopulated := make(chan int) missingBlocksPopulated := make(chan int)
go backFillAllBlocks(blockchain, repository, missingBlocksPopulated) _startingBlockNumber := int64(*startingBlockNumber)
go backFillAllBlocks(blockchain, repository, missingBlocksPopulated, _startingBlockNumber)
for { for {
select { select {
@ -45,7 +47,7 @@ func main() {
window := validator.ValidateBlocks() window := validator.ValidateBlocks()
validator.Log(os.Stdout, window) validator.Log(os.Stdout, window)
case <-missingBlocksPopulated: case <-missingBlocksPopulated:
go backFillAllBlocks(blockchain, repository, missingBlocksPopulated) go backFillAllBlocks(blockchain, repository, missingBlocksPopulated, _startingBlockNumber)
} }
} }
} }

View File

@ -0,0 +1 @@
DROP TABLE log_filters;

View File

@ -0,0 +1,12 @@
CREATE TABLE log_filters (
id SERIAL,
name VARCHAR NOT NULL CHECK (name <> ''),
from_block BIGINT CHECK (from_block >= 0),
to_block BIGINT CHECK (from_block >= 0),
address VARCHAR(66),
topic0 VARCHAR(66),
topic1 VARCHAR(66),
topic2 VARCHAR(66),
topic3 VARCHAR(66),
CONSTRAINT name_uc UNIQUE (name)
);

View File

@ -0,0 +1 @@
DROP VIEW watched_event_logs;

View File

@ -0,0 +1,29 @@
CREATE VIEW block_stats AS
SELECT
max(block_number) AS max_block,
min(block_number) AS min_block
FROM logs;
CREATE VIEW watched_event_logs AS
SELECT
log_filters.name,
logs.id,
block_number,
logs.address,
tx_hash,
index,
logs.topic0,
logs.topic1,
logs.topic2,
logs.topic3,
data,
receipt_id
FROM log_filters
CROSS JOIN block_stats
JOIN logs ON logs.address = log_filters.address
AND logs.block_number >= coalesce(log_filters.from_block, block_stats.min_block)
AND logs.block_number <= coalesce(log_filters.to_block, block_stats.max_block)
WHERE (log_filters.topic0 = logs.topic0 OR log_filters.topic0 ISNULL)
AND (log_filters.topic1 = logs.topic1 OR log_filters.topic1 ISNULL)
AND (log_filters.topic2 = logs.topic2 OR log_filters.topic2 ISNULL)
AND (log_filters.topic3 = logs.topic3 OR log_filters.topic3 ISNULL);

View File

@ -34,6 +34,35 @@ SET default_tablespace = '';
SET default_with_oids = false; SET default_with_oids = false;
--
-- Name: logs; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE logs (
id integer NOT NULL,
block_number bigint,
address character varying(66),
tx_hash character varying(66),
index bigint,
topic0 character varying(66),
topic1 character varying(66),
topic2 character varying(66),
topic3 character varying(66),
data text,
receipt_id integer
);
--
-- Name: block_stats; Type: VIEW; Schema: public; Owner: -
--
CREATE VIEW block_stats AS
SELECT max(logs.block_number) AS max_block,
min(logs.block_number) AS min_block
FROM logs;
-- --
-- Name: blocks; Type: TABLE; Schema: public; Owner: - -- Name: blocks; Type: TABLE; Schema: public; Owner: -
-- --
@ -80,24 +109,45 @@ ALTER SEQUENCE blocks_id_seq OWNED BY blocks.id;
-- --
-- Name: logs; Type: TABLE; Schema: public; Owner: - -- Name: log_filters; Type: TABLE; Schema: public; Owner: -
-- --
CREATE TABLE logs ( CREATE TABLE log_filters (
id integer NOT NULL, id integer NOT NULL,
block_number bigint, name character varying NOT NULL,
from_block bigint,
to_block bigint,
address character varying(66), address character varying(66),
tx_hash character varying(66),
index bigint,
topic0 character varying(66), topic0 character varying(66),
topic1 character varying(66), topic1 character varying(66),
topic2 character varying(66), topic2 character varying(66),
topic3 character varying(66), topic3 character varying(66),
data text, CONSTRAINT log_filters_from_block_check CHECK ((from_block >= 0)),
receipt_id integer CONSTRAINT log_filters_from_block_check1 CHECK ((from_block >= 0)),
CONSTRAINT log_filters_name_check CHECK (((name)::text <> ''::text))
); );
--
-- Name: log_filters_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE log_filters_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: log_filters_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE log_filters_id_seq OWNED BY log_filters.id;
-- --
-- Name: logs_id_seq; Type: SEQUENCE; Schema: public; Owner: - -- Name: logs_id_seq; Type: SEQUENCE; Schema: public; Owner: -
-- --
@ -266,6 +316,29 @@ CREATE SEQUENCE watched_contracts_contract_id_seq
ALTER SEQUENCE watched_contracts_contract_id_seq OWNED BY watched_contracts.contract_id; ALTER SEQUENCE watched_contracts_contract_id_seq OWNED BY watched_contracts.contract_id;
--
-- Name: watched_event_logs; Type: VIEW; Schema: public; Owner: -
--
CREATE VIEW watched_event_logs AS
SELECT log_filters.name,
logs.id,
logs.block_number,
logs.address,
logs.tx_hash,
logs.index,
logs.topic0,
logs.topic1,
logs.topic2,
logs.topic3,
logs.data,
logs.receipt_id
FROM ((log_filters
CROSS JOIN block_stats)
JOIN logs ON ((((logs.address)::text = (log_filters.address)::text) AND (logs.block_number >= COALESCE(log_filters.from_block, block_stats.min_block)) AND (logs.block_number <= COALESCE(log_filters.to_block, block_stats.max_block)))))
WHERE ((((log_filters.topic0)::text = (logs.topic0)::text) OR (log_filters.topic0 IS NULL)) AND (((log_filters.topic1)::text = (logs.topic1)::text) OR (log_filters.topic1 IS NULL)) AND (((log_filters.topic2)::text = (logs.topic2)::text) OR (log_filters.topic2 IS NULL)) AND (((log_filters.topic3)::text = (logs.topic3)::text) OR (log_filters.topic3 IS NULL)));
-- --
-- Name: blocks id; Type: DEFAULT; Schema: public; Owner: - -- Name: blocks id; Type: DEFAULT; Schema: public; Owner: -
-- --
@ -273,6 +346,13 @@ ALTER SEQUENCE watched_contracts_contract_id_seq OWNED BY watched_contracts.cont
ALTER TABLE ONLY blocks ALTER COLUMN id SET DEFAULT nextval('blocks_id_seq'::regclass); ALTER TABLE ONLY blocks ALTER COLUMN id SET DEFAULT nextval('blocks_id_seq'::regclass);
--
-- Name: log_filters id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY log_filters ALTER COLUMN id SET DEFAULT nextval('log_filters_id_seq'::regclass);
-- --
-- Name: logs id; Type: DEFAULT; Schema: public; Owner: - -- Name: logs id; Type: DEFAULT; Schema: public; Owner: -
-- --
@ -332,6 +412,14 @@ ALTER TABLE ONLY logs
ADD CONSTRAINT logs_pkey PRIMARY KEY (id); ADD CONSTRAINT logs_pkey PRIMARY KEY (id);
--
-- Name: log_filters name_uc; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY log_filters
ADD CONSTRAINT name_uc UNIQUE (name);
-- --
-- Name: blocks node_id_block_number_uc; Type: CONSTRAINT; Schema: public; Owner: - -- Name: blocks node_id_block_number_uc; Type: CONSTRAINT; Schema: public; Owner: -
-- --

View File

@ -0,0 +1,15 @@
[{
"name": "TransferFilter",
"fromBlock": "0x488290",
"toBlock": "0x488678",
"address": "0x06012c8cf97bead5deae237070f9587f8e7a266d",
"topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"]
},
{
"name": "NewFilter",
"toBlock": "0x4B34AA",
"fromBlock": "0x4B34AD",
"address": "0x06012c8cf97bead5deae237070f9587f8e7a266d",
"topics": ["0x241ea03ca20251805084d27d4440371c34a0b85ff108f6bb5611248f73818b80"]
}]

View File

@ -0,0 +1,64 @@
package filters
import (
"encoding/json"
"errors"
"github.com/8thlight/vulcanizedb/pkg/core"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
)
type LogFilters []LogFilter
type LogFilter struct {
Name string `json:"name"`
FromBlock int64 `json:"fromBlock"`
ToBlock int64 `json:"toBlock"`
Address string `json:"address"`
core.Topics `json:"topics"`
}
func (filterQuery *LogFilter) UnmarshalJSON(input []byte) error {
type Alias LogFilter
var err error
aux := &struct {
ToBlock string `json:"toBlock"`
FromBlock string `json:"fromBlock"`
*Alias
}{
Alias: (*Alias)(filterQuery),
}
if err := json.Unmarshal(input, &aux); err != nil {
return err
}
if filterQuery.Name == "" {
return errors.New("filters: must provide name for logfilter")
}
filterQuery.ToBlock, err = filterQuery.unmarshalFromToBlock(aux.ToBlock)
if err != nil {
return errors.New("filters: invalid fromBlock")
}
filterQuery.FromBlock, err = filterQuery.unmarshalFromToBlock(aux.FromBlock)
if err != nil {
return errors.New("filters: invalid fromBlock")
}
if !common.IsHexAddress(filterQuery.Address) {
return errors.New("filters: invalid address")
}
return nil
}
func (filterQuery *LogFilter) unmarshalFromToBlock(auxBlock string) (int64, error) {
if auxBlock == "" {
return -1, nil
}
block, err := hexutil.DecodeUint64(auxBlock)
if err != nil {
return 0, errors.New("filters: invalid block arg")
}
return int64(block), nil
}

125
pkg/filters/filter_test.go Normal file
View File

@ -0,0 +1,125 @@
package filters_test
import (
"encoding/json"
"github.com/8thlight/vulcanizedb/pkg/core"
"github.com/8thlight/vulcanizedb/pkg/filters"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Log filters", func() {
It("decodes web3 filter to LogFilter", func() {
var logFilter filters.LogFilter
jsonFilter := []byte(
`{
"name": "TestEvent",
"fromBlock": "0x1",
"toBlock": "0x488290",
"address": "0x8888f1f195afa192cfee860698584c030f4c9db1",
"topics": ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null, "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null]
}`)
err := json.Unmarshal(jsonFilter, &logFilter)
Expect(err).ToNot(HaveOccurred())
Expect(logFilter.Name).To(Equal("TestEvent"))
Expect(logFilter.FromBlock).To(Equal(int64(1)))
Expect(logFilter.ToBlock).To(Equal(int64(4752016)))
Expect(logFilter.Address).To(Equal("0x8888f1f195afa192cfee860698584c030f4c9db1"))
Expect(logFilter.Topics).To(Equal(
core.Topics{
"0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b",
"",
"0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b",
""}))
})
It("decodes array of web3 filters to []LogFilter", func() {
logFilters := make([]filters.LogFilter, 0)
jsonFilter := []byte(
`[{
"name": "TestEvent",
"fromBlock": "0x1",
"toBlock": "0x488290",
"address": "0x8888f1f195afa192cfee860698584c030f4c9db1",
"topics": ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null, "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null]
},
{
"name": "TestEvent2",
"fromBlock": "0x3",
"toBlock": "0x4",
"address": "0xd26114cd6EE289AccF82350c8d8487fedB8A0C07",
"topics": ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", "0x0000000000000000000000006b0949d4c6edfe467db78241b7d5566f3c2bb43e", "0x0000000000000000000000005e44c3e467a49c9ca0296a9f130fc433041aaa28"]
}]`)
err := json.Unmarshal(jsonFilter, &logFilters)
Expect(err).ToNot(HaveOccurred())
Expect(len(logFilters)).To(Equal(2))
Expect(logFilters[0].Name).To(Equal("TestEvent"))
Expect(logFilters[1].Name).To(Equal("TestEvent2"))
})
It("requires valid ethereum address", func() {
var logFilter filters.LogFilter
jsonFilter := []byte(
`{
"name": "TestEvent",
"fromBlock": "0x1",
"toBlock": "0x2",
"address": "0x8888f1f195afa192cf84c030f4c9db1",
"topics": ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null, "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null]
}`)
err := json.Unmarshal(jsonFilter, &logFilter)
Expect(err).To(HaveOccurred())
})
It("requires name", func() {
var logFilter filters.LogFilter
jsonFilter := []byte(
`{
"fromBlock": "0x1",
"toBlock": "0x2",
"address": "0x8888f1f195afa192cfee860698584c030f4c9db1",
"topics": ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null, "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null]
}`)
err := json.Unmarshal(jsonFilter, &logFilter)
Expect(err).To(HaveOccurred())
})
It("maps missing fromBlock to -1", func() {
var logFilter filters.LogFilter
jsonFilter := []byte(
`{
"name": "TestEvent",
"toBlock": "0x2",
"address": "0x8888f1f195afa192cfee860698584c030f4c9db1",
"topics": ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null, "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null]
}`)
err := json.Unmarshal(jsonFilter, &logFilter)
Expect(err).ToNot(HaveOccurred())
Expect(logFilter.FromBlock).To(Equal(int64(-1)))
})
It("maps missing toBlock to -1", func() {
var logFilter filters.LogFilter
jsonFilter := []byte(
`{
"name": "TestEvent",
"address": "0x8888f1f195afa192cfee860698584c030f4c9db1",
"topics": ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null, "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null]
}`)
err := json.Unmarshal(jsonFilter, &logFilter)
Expect(err).ToNot(HaveOccurred())
Expect(logFilter.ToBlock).To(Equal(int64(-1)))
})
})

View File

@ -0,0 +1,13 @@
package filters_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestQueryBuilder(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "QueryBuilder Suite")
}

View File

@ -3,7 +3,10 @@ package repositories
import ( import (
"fmt" "fmt"
"errors"
"github.com/8thlight/vulcanizedb/pkg/core" "github.com/8thlight/vulcanizedb/pkg/core"
"github.com/8thlight/vulcanizedb/pkg/filters"
) )
type InMemory struct { type InMemory struct {
@ -11,9 +14,19 @@ type InMemory struct {
receipts map[string]core.Receipt receipts map[string]core.Receipt
contracts map[string]core.Contract contracts map[string]core.Contract
logs map[string][]core.Log logs map[string][]core.Log
logFilters map[string]filters.LogFilter
CreateOrUpdateBlockCallCount int CreateOrUpdateBlockCallCount int
} }
func (repository *InMemory) AddFilter(filter filters.LogFilter) error {
key := filter.Name
if _, ok := repository.logFilters[key]; ok || key == "" {
return errors.New("filter name not unique")
}
repository.logFilters[key] = filter
return nil
}
func NewInMemory() *InMemory { func NewInMemory() *InMemory {
return &InMemory{ return &InMemory{
CreateOrUpdateBlockCallCount: 0, CreateOrUpdateBlockCallCount: 0,
@ -21,6 +34,7 @@ func NewInMemory() *InMemory {
receipts: make(map[string]core.Receipt), receipts: make(map[string]core.Receipt),
contracts: make(map[string]core.Contract), contracts: make(map[string]core.Contract),
logs: make(map[string][]core.Log), logs: make(map[string][]core.Log),
logFilters: make(map[string]filters.LogFilter),
} }
} }

View File

@ -11,6 +11,7 @@ import (
"github.com/8thlight/vulcanizedb/pkg/config" "github.com/8thlight/vulcanizedb/pkg/config"
"github.com/8thlight/vulcanizedb/pkg/core" "github.com/8thlight/vulcanizedb/pkg/core"
"github.com/8thlight/vulcanizedb/pkg/filters"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
_ "github.com/lib/pq" _ "github.com/lib/pq"
) )
@ -303,7 +304,7 @@ func (repository Postgres) createTransaction(tx *sql.Tx, blockId int64, transact
err := tx.QueryRow( err := tx.QueryRow(
`INSERT INTO transactions `INSERT INTO transactions
(block_id, tx_hash, tx_nonce, tx_to, tx_from, tx_gaslimit, tx_gasprice, tx_value, tx_input_data) (block_id, tx_hash, tx_nonce, tx_to, tx_from, tx_gaslimit, tx_gasprice, tx_value, tx_input_data)
VALUES ($1, $2, $3, $4, $5, $6, $7, cast(NULLIF($8, '') as NUMERIC), $9) VALUES ($1, $2, $3, $4, $5, $6, $7, cast(NULLIF($8, '') AS NUMERIC), $9)
RETURNING id`, RETURNING id`,
blockId, transaction.Hash, transaction.Nonce, transaction.To, transaction.From, transaction.GasLimit, transaction.GasPrice, transaction.Value, transaction.Data). blockId, transaction.Hash, transaction.Nonce, transaction.To, transaction.From, transaction.GasLimit, transaction.GasPrice, transaction.Value, transaction.Data).
Scan(&transactionId) Scan(&transactionId)
@ -381,6 +382,18 @@ func (repository Postgres) CreateLogs(logs []core.Log) error {
return nil return nil
} }
func (repository Postgres) AddFilter(query filters.LogFilter) error {
_, err := repository.Db.Exec(
`INSERT INTO log_filters
(name, from_block, to_block, address, topic0, topic1, topic2, topic3)
VALUES ($1, NULLIF($2, -1), NULLIF($3, -1), $4, NULLIF($5, ''), NULLIF($6, ''), NULLIF($7, ''), NULLIF($8, ''))`,
query.Name, query.FromBlock, query.ToBlock, query.Address, query.Topics[0], query.Topics[1], query.Topics[2], query.Topics[3])
if err != nil {
return err
}
return nil
}
func loadReceipt(receiptsRow *sql.Row) (core.Receipt, error) { func loadReceipt(receiptsRow *sql.Row) (core.Receipt, error) {
var contractAddress string var contractAddress string
var txHash string var txHash string

View File

@ -1,6 +1,9 @@
package repositories package repositories
import "github.com/8thlight/vulcanizedb/pkg/core" import (
"github.com/8thlight/vulcanizedb/pkg/core"
"github.com/8thlight/vulcanizedb/pkg/filters"
)
const ( const (
blocksFromHeadBeforeFinal = 20 blocksFromHeadBeforeFinal = 20
@ -19,4 +22,5 @@ type Repository interface {
CreateLogs(log []core.Log) error CreateLogs(log []core.Log) error
FindLogs(address string, blockNumber int64) []core.Log FindLogs(address string, blockNumber int64) []core.Log
SetBlocksStatus(chainHead int64) SetBlocksStatus(chainHead int64)
AddFilter(filter filters.LogFilter) error
} }

View File

@ -7,6 +7,7 @@ import (
"math/big" "math/big"
"github.com/8thlight/vulcanizedb/pkg/core" "github.com/8thlight/vulcanizedb/pkg/core"
"github.com/8thlight/vulcanizedb/pkg/filters"
"github.com/8thlight/vulcanizedb/pkg/repositories" "github.com/8thlight/vulcanizedb/pkg/repositories"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -18,6 +19,7 @@ func ClearData(postgres repositories.Postgres) {
postgres.Db.MustExec("DELETE FROM blocks") postgres.Db.MustExec("DELETE FROM blocks")
postgres.Db.MustExec("DELETE FROM logs") postgres.Db.MustExec("DELETE FROM logs")
postgres.Db.MustExec("DELETE FROM receipts") postgres.Db.MustExec("DELETE FROM receipts")
postgres.Db.MustExec("DELETE FROM log_filters")
} }
func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.Repository) { func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.Repository) {
@ -600,6 +602,43 @@ func AssertRepositoryBehavior(buildRepository func(node core.Node) repositories.
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(receipt).To(BeZero()) Expect(receipt).To(BeZero())
}) })
})
Describe("LogFilter", func() {
It("inserts filter into watched events", func() {
logFilter := filters.LogFilter{
Name: "TestFilter",
FromBlock: 1,
ToBlock: 2,
Address: "0x8888f1f195afa192cfee860698584c030f4c9db1",
Topics: core.Topics{
"0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b",
"",
"0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b",
"",
},
}
err := repository.AddFilter(logFilter)
Expect(err).ToNot(HaveOccurred())
})
It("returns error if name is not provided", func() {
logFilter := filters.LogFilter{
FromBlock: 1,
ToBlock: 2,
Address: "0x8888f1f195afa192cfee860698584c030f4c9db1",
Topics: core.Topics{
"0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b",
"",
"0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b",
"",
},
}
err := repository.AddFilter(logFilter)
Expect(err).To(HaveOccurred())
})
}) })
} }