diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index d3e0e0b9..633993fa 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -170,7 +170,10 @@ func composeAndExecute() { var wg syn.WaitGroup if len(ethEventInitializers) > 0 { ew := watcher.NewEventWatcher(&db, blockChain) - ew.AddTransformers(ethEventInitializers) + err := ew.AddTransformers(ethEventInitializers) + if err != nil { + LogWithCommand.Fatalf("failed to add event transformer initializers to watcher: %s", err.Error()) + } wg.Add(1) go watchEthEvents(&ew, &wg) } diff --git a/cmd/execute.go b/cmd/execute.go index 108eba82..1a71b297 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -114,7 +114,10 @@ func execute() { var wg syn.WaitGroup if len(ethEventInitializers) > 0 { ew := watcher.NewEventWatcher(&db, blockChain) - ew.AddTransformers(ethEventInitializers) + err = ew.AddTransformers(ethEventInitializers) + if err != nil { + LogWithCommand.Fatalf("failed to add event transformer initializers to watcher: %s", err.Error()) + } wg.Add(1) go watchEthEvents(&ew, &wg) } diff --git a/db/migrations/00029_create_checked_logs_table.sql b/db/migrations/00029_create_checked_logs_table.sql new file mode 100644 index 00000000..91445cd9 --- /dev/null +++ b/db/migrations/00029_create_checked_logs_table.sql @@ -0,0 +1,12 @@ +-- +goose Up +-- SQL in this section is executed when the migration is applied. +CREATE TABLE public.checked_logs +( + id SERIAL PRIMARY KEY, + contract_address VARCHAR(42), + topic_zero VARCHAR(66) +); + +-- +goose Down +-- SQL in this section is executed when the migration is rolled back. +DROP TABLE public.checked_logs; \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index 6942d6a9..16791a83 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -156,6 +156,37 @@ CREATE SEQUENCE public.checked_headers_id_seq ALTER SEQUENCE public.checked_headers_id_seq OWNED BY public.checked_headers.id; +-- +-- Name: checked_logs; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.checked_logs ( + id integer NOT NULL, + contract_address character varying(42), + topic_zero character varying(66) +); + + +-- +-- Name: checked_logs_id_seq; Type: SEQUENCE; Schema: public; Owner: - +-- + +CREATE SEQUENCE public.checked_logs_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: checked_logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: - +-- + +ALTER SEQUENCE public.checked_logs_id_seq OWNED BY public.checked_logs.id; + + -- -- Name: eth_nodes; Type: TABLE; Schema: public; Owner: - -- @@ -656,6 +687,13 @@ ALTER TABLE ONLY public.blocks ALTER COLUMN id SET DEFAULT nextval('public.block ALTER TABLE ONLY public.checked_headers ALTER COLUMN id SET DEFAULT nextval('public.checked_headers_id_seq'::regclass); +-- +-- Name: checked_logs id; Type: DEFAULT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.checked_logs ALTER COLUMN id SET DEFAULT nextval('public.checked_logs_id_seq'::regclass); + + -- -- Name: eth_nodes id; Type: DEFAULT; Schema: public; Owner: - -- @@ -787,6 +825,14 @@ ALTER TABLE ONLY public.checked_headers ADD CONSTRAINT checked_headers_pkey PRIMARY KEY (id); +-- +-- Name: checked_logs checked_logs_pkey; Type: CONSTRAINT; Schema: public; Owner: - +-- + +ALTER TABLE ONLY public.checked_logs + ADD CONSTRAINT checked_logs_pkey PRIMARY KEY (id); + + -- -- Name: blocks eth_node_id_block_number_uc; Type: CONSTRAINT; Schema: public; Owner: - -- diff --git a/go.mod b/go.mod index 7dc9f4c0..3f13c0a6 100644 --- a/go.mod +++ b/go.mod @@ -6,28 +6,44 @@ require ( github.com/allegro/bigcache v1.2.1 // indirect github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015 // indirect github.com/dave/jennifer v1.3.0 + github.com/deckarep/golang-set v1.7.1 // indirect + github.com/edsrzf/mmap-go v1.0.0 // indirect + github.com/elastic/gosigar v0.10.4 // indirect github.com/ethereum/go-ethereum v1.9.1 + github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect + github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect + github.com/gorilla/websocket v1.4.1 // indirect github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6 // indirect github.com/hashicorp/golang-lru v0.5.1 github.com/hpcloud/tail v1.0.0 + github.com/huin/goupnp v1.0.0 // indirect + github.com/jackpal/go-nat-pmp v1.0.1 // indirect github.com/jmoiron/sqlx v0.0.0-20181024163419-82935fac6c1a github.com/karalabe/usb v0.0.0-20190819132248-550797b1cad8 // indirect github.com/lib/pq v1.0.0 + github.com/mattn/go-colorable v0.1.2 // indirect github.com/mattn/go-isatty v0.0.9 // indirect + github.com/mattn/go-runewidth v0.0.4 // indirect github.com/mitchellh/go-homedir v1.1.0 + github.com/olekukonko/tablewriter v0.0.1 // indirect github.com/onsi/ginkgo v1.7.0 github.com/onsi/gomega v1.4.3 github.com/pborman/uuid v1.2.0 // indirect github.com/pressly/goose v2.6.0+incompatible github.com/prometheus/tsdb v0.10.0 // indirect + github.com/rjeczalik/notify v0.9.2 // indirect github.com/rs/cors v1.7.0 // indirect github.com/sirupsen/logrus v1.2.0 github.com/spf13/cobra v0.0.3 github.com/spf13/viper v1.3.2 + github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48 // indirect github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 // indirect github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect + github.com/syndtr/goleveldb v1.0.0 // indirect github.com/tyler-smith/go-bip39 v1.0.2 // indirect + github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 // indirect golang.org/x/net v0.0.0-20190603091049-60506f45cf65 golang.org/x/sync v0.0.0-20190423024810-112230192c58 + gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190709231704-1e4459ed25ff // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 ) diff --git a/go.sum b/go.sum index 976da9ae..fa407d6f 100644 --- a/go.sum +++ b/go.sum @@ -60,6 +60,7 @@ github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0 h1:8HUsc87TaSWLKwrnumgC8/YconD2fJQsRJAsWaPg2ic= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= @@ -67,9 +68,11 @@ github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= @@ -78,6 +81,8 @@ github.com/google/uuid v1.1.0 h1:Jf4mxPC/ziBnoPIdpQdPJ9OeiomAUHLvxmPRSPH9m4s= github.com/google/uuid v1.1.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= +github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/graph-gophers/graphql-go v0.0.0-20190610161739-8f92f34fc598/go.mod h1:Au3iQ8DvDis8hZ4q2OzRcaKYlAsPt+fYvib5q4nIqu4= github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6 h1:9WiNlI9Cds5S5YITwRpRs8edNaq0nxTEymhDW20A1QE= github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6/go.mod h1:Au3iQ8DvDis8hZ4q2OzRcaKYlAsPt+fYvib5q4nIqu4= @@ -132,6 +137,7 @@ github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y= github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= +github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v1.11.0 h1:LDdKkqtYlom37fkvqs8rMPFKAMe8+SgjbwZ6ex1/A/Q= github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= @@ -166,6 +172,7 @@ github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -226,6 +233,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs= github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= +github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= +github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/tyler-smith/go-bip39 v1.0.0/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs= github.com/tyler-smith/go-bip39 v1.0.2 h1:+t3w+KwLXO6154GNJY+qUtIxLTmFjfUmpguQT1OlOT8= github.com/tyler-smith/go-bip39 v1.0.2/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs= diff --git a/libraries/shared/logs/extractor.go b/libraries/shared/logs/extractor.go index f8132352..aed39d02 100644 --- a/libraries/shared/logs/extractor.go +++ b/libraries/shared/logs/extractor.go @@ -36,13 +36,14 @@ const ( ) type ILogExtractor interface { - AddTransformerConfig(config transformer.EventTransformerConfig) + AddTransformerConfig(config transformer.EventTransformerConfig) error ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) } type LogExtractor struct { Addresses []common.Address CheckedHeadersRepository datastore.CheckedHeadersRepository + CheckedLogsRepository datastore.CheckedLogsRepository Fetcher fetcher.ILogFetcher LogRepository datastore.HeaderSyncLogRepository StartingBlock *int64 @@ -51,7 +52,12 @@ type LogExtractor struct { } // Add additional logs to extract -func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) { +func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) error { + checkedHeadersErr := extractor.updateCheckedHeaders(config) + if checkedHeadersErr != nil { + return checkedHeadersErr + } + if extractor.StartingBlock == nil { extractor.StartingBlock = &config.StartingBlockNumber } else if earlierStartingBlockNumber(config.StartingBlockNumber, *extractor.StartingBlock) { @@ -61,6 +67,7 @@ func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTran addresses := transformer.HexStringsToAddresses(config.ContractAddresses) extractor.Addresses = append(extractor.Addresses, addresses...) extractor.Topics = append(extractor.Topics, common.HexToHash(config.Topic)) + return nil } // Fetch and persist watched logs @@ -129,3 +136,21 @@ func getCheckCount(recheckHeaders constants.TransformerExecution) int64 { return constants.RecheckHeaderCap } } + +func (extractor *LogExtractor) updateCheckedHeaders(config transformer.EventTransformerConfig) error { + hasBeenChecked, hasBeenCheckedErr := extractor.CheckedLogsRepository.HaveLogsBeenChecked(config.ContractAddresses, config.Topic) + if hasBeenCheckedErr != nil { + return hasBeenCheckedErr + } + if !hasBeenChecked { + err := extractor.CheckedHeadersRepository.MarkHeadersUnchecked(config.StartingBlockNumber) + if err != nil { + return err + } + nextErr := extractor.CheckedLogsRepository.MarkLogsChecked(config.ContractAddresses, config.Topic) + if nextErr != nil { + return nextErr + } + } + return nil +} diff --git a/libraries/shared/logs/extractor_test.go b/libraries/shared/logs/extractor_test.go index d21f2f6a..1ce47cc7 100644 --- a/libraries/shared/logs/extractor_test.go +++ b/libraries/shared/logs/extractor_test.go @@ -31,42 +31,52 @@ import ( ) var _ = Describe("Log extractor", func() { - var extractor *logs.LogExtractor + var ( + checkedHeadersRepository *fakes.MockCheckedHeadersRepository + checkedLogsRepository *fakes.MockCheckedLogsRepository + extractor *logs.LogExtractor + ) BeforeEach(func() { + checkedHeadersRepository = &fakes.MockCheckedHeadersRepository{} + checkedLogsRepository = &fakes.MockCheckedLogsRepository{} extractor = &logs.LogExtractor{ + CheckedHeadersRepository: checkedHeadersRepository, + CheckedLogsRepository: checkedLogsRepository, Fetcher: &mocks.MockLogFetcher{}, - CheckedHeadersRepository: &fakes.MockCheckedHeadersRepository{}, LogRepository: &fakes.MockHeaderSyncLogRepository{}, Syncer: &fakes.MockTransactionSyncer{}, } }) Describe("AddTransformerConfig", func() { - It("it includes earliest starting block number in fetch logs query", func() { + It("updates extractor's starting block number to earliest available", func() { earlierStartingBlockNumber := rand.Int63() laterStartingBlockNumber := earlierStartingBlockNumber + 1 - extractor.AddTransformerConfig(getTransformerConfig(laterStartingBlockNumber)) - extractor.AddTransformerConfig(getTransformerConfig(earlierStartingBlockNumber)) + errOne := extractor.AddTransformerConfig(getTransformerConfig(laterStartingBlockNumber)) + Expect(errOne).NotTo(HaveOccurred()) + errTwo := extractor.AddTransformerConfig(getTransformerConfig(earlierStartingBlockNumber)) + Expect(errTwo).NotTo(HaveOccurred()) Expect(*extractor.StartingBlock).To(Equal(earlierStartingBlockNumber)) }) - It("includes added addresses in fetch logs query", func() { + It("adds transformer's addresses to extractor's watched addresses", func() { addresses := []string{"0xA", "0xB"} configWithAddresses := transformer.EventTransformerConfig{ ContractAddresses: addresses, StartingBlockNumber: rand.Int63(), } - extractor.AddTransformerConfig(configWithAddresses) + err := extractor.AddTransformerConfig(configWithAddresses) + Expect(err).NotTo(HaveOccurred()) expectedAddresses := transformer.HexStringsToAddresses(addresses) Expect(extractor.Addresses).To(Equal(expectedAddresses)) }) - It("includes added topics in fetch logs query", func() { + It("adds transformer's topic to extractor's watched topics", func() { topic := "0x1" configWithTopic := transformer.EventTransformerConfig{ ContractAddresses: []string{fakes.FakeAddress.Hex()}, @@ -74,10 +84,75 @@ var _ = Describe("Log extractor", func() { StartingBlockNumber: rand.Int63(), } - extractor.AddTransformerConfig(configWithTopic) + err := extractor.AddTransformerConfig(configWithTopic) + Expect(err).NotTo(HaveOccurred()) Expect(extractor.Topics).To(Equal([]common.Hash{common.HexToHash(topic)})) }) + + It("returns error if checking whether log has been checked returns error", func() { + checkedLogsRepository.HasLogBeenCheckedError = fakes.FakeError + + err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63())) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + + Describe("when log has previously been checked", func() { + It("does not mark any headers unchecked", func() { + checkedLogsRepository.HasLogBeenCheckedReturn = true + + err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63())) + + Expect(err).NotTo(HaveOccurred()) + Expect(checkedHeadersRepository.MarkHeadersUncheckedCalled).To(BeFalse()) + }) + }) + + Describe("when log has not previously been checked", func() { + BeforeEach(func() { + checkedLogsRepository.HasLogBeenCheckedReturn = false + }) + + It("marks headers since transformer's starting block number as unchecked", func() { + blockNumber := rand.Int63() + + err := extractor.AddTransformerConfig(getTransformerConfig(blockNumber)) + + Expect(err).NotTo(HaveOccurred()) + Expect(checkedHeadersRepository.MarkHeadersUncheckedCalled).To(BeTrue()) + Expect(checkedHeadersRepository.MarkHeadersUncheckedStartingBlockNumber).To(Equal(blockNumber)) + }) + + It("returns error if marking headers unchecked returns error", func() { + checkedHeadersRepository.MarkHeadersUncheckedReturnError = fakes.FakeError + + err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63())) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + + It("persists that tranformer's log has been checked", func() { + config := getTransformerConfig(rand.Int63()) + + err := extractor.AddTransformerConfig(config) + + Expect(err).NotTo(HaveOccurred()) + Expect(checkedLogsRepository.MarkLogCheckedAddresses).To(Equal(config.ContractAddresses)) + Expect(checkedLogsRepository.MarkLogCheckedTopicZero).To(Equal(config.Topic)) + }) + + It("returns error if marking logs checked returns error", func() { + checkedLogsRepository.MarkLogCheckedError = fakes.FakeError + + err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63())) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + }) }) Describe("ExtractLogs", func() { @@ -91,7 +166,7 @@ var _ = Describe("Log extractor", func() { Describe("when checking missing headers", func() { It("gets missing headers since configured starting block with check_count < 1", func() { mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} - mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}} + mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{}} extractor.CheckedHeadersRepository = mockCheckedHeadersRepository startingBlockNumber := rand.Int63() extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) @@ -99,16 +174,16 @@ var _ = Describe("Log extractor", func() { err, _ := extractor.ExtractLogs(constants.HeaderMissing) Expect(err).NotTo(HaveOccurred()) - Expect(mockCheckedHeadersRepository.StartingBlockNumber).To(Equal(startingBlockNumber)) - Expect(mockCheckedHeadersRepository.EndingBlockNumber).To(Equal(int64(-1))) - Expect(mockCheckedHeadersRepository.CheckCount).To(Equal(int64(1))) + Expect(mockCheckedHeadersRepository.MissingHeadersStartingBlockNumber).To(Equal(startingBlockNumber)) + Expect(mockCheckedHeadersRepository.MissingHeadersEndingBlockNumber).To(Equal(int64(-1))) + Expect(mockCheckedHeadersRepository.MissingHeadersCheckCount).To(Equal(int64(1))) }) }) Describe("when rechecking headers", func() { It("gets missing headers since configured starting block with check_count < RecheckHeaderCap", func() { mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} - mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}} + mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{}} extractor.CheckedHeadersRepository = mockCheckedHeadersRepository startingBlockNumber := rand.Int63() extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber)) @@ -116,9 +191,9 @@ var _ = Describe("Log extractor", func() { err, _ := extractor.ExtractLogs(constants.HeaderRecheck) Expect(err).NotTo(HaveOccurred()) - Expect(mockCheckedHeadersRepository.StartingBlockNumber).To(Equal(startingBlockNumber)) - Expect(mockCheckedHeadersRepository.EndingBlockNumber).To(Equal(int64(-1))) - Expect(mockCheckedHeadersRepository.CheckCount).To(Equal(constants.RecheckHeaderCap)) + Expect(mockCheckedHeadersRepository.MissingHeadersStartingBlockNumber).To(Equal(startingBlockNumber)) + Expect(mockCheckedHeadersRepository.MissingHeadersEndingBlockNumber).To(Equal(int64(-1))) + Expect(mockCheckedHeadersRepository.MissingHeadersCheckCount).To(Equal(constants.RecheckHeaderCap)) }) }) @@ -274,20 +349,20 @@ var _ = Describe("Log extractor", func() { addTransformerConfig(extractor) mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} headerID := rand.Int63() - mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: headerID}} + mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{Id: headerID}} extractor.CheckedHeadersRepository = mockCheckedHeadersRepository err, _ := extractor.ExtractLogs(constants.HeaderMissing) Expect(err).NotTo(HaveOccurred()) - Expect(mockCheckedHeadersRepository.HeaderID).To(Equal(headerID)) + Expect(mockCheckedHeadersRepository.MarkHeaderCheckedHeaderID).To(Equal(headerID)) }) It("returns error if marking header checked fails", func() { addFetchedLog(extractor) addTransformerConfig(extractor) mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} - mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: rand.Int63()}} + mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{Id: rand.Int63()}} mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError extractor.CheckedHeadersRepository = mockCheckedHeadersRepository @@ -321,7 +396,7 @@ func addTransformerConfig(extractor *logs.LogExtractor) { func addMissingHeader(extractor *logs.LogExtractor) { mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{} - mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}} + mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{}} extractor.CheckedHeadersRepository = mockCheckedHeadersRepository } diff --git a/libraries/shared/mocks/log_extractor.go b/libraries/shared/mocks/log_extractor.go index 1cb26132..54511f98 100644 --- a/libraries/shared/mocks/log_extractor.go +++ b/libraries/shared/mocks/log_extractor.go @@ -22,14 +22,16 @@ import ( ) type MockLogExtractor struct { - AddedConfigs []transformer.EventTransformerConfig - ExtractLogsCount int - ExtractLogsErrors []error - MissingHeadersExist []bool + AddedConfigs []transformer.EventTransformerConfig + AddTransformerConfigError error + ExtractLogsCount int + ExtractLogsErrors []error + MissingHeadersExist []bool } -func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) { +func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) error { extractor.AddedConfigs = append(extractor.AddedConfigs, config) + return extractor.AddTransformerConfigError } func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) { diff --git a/libraries/shared/watcher/event_watcher.go b/libraries/shared/watcher/event_watcher.go index e10da6c6..04386c63 100644 --- a/libraries/shared/watcher/event_watcher.go +++ b/libraries/shared/watcher/event_watcher.go @@ -41,8 +41,9 @@ type EventWatcher struct { func NewEventWatcher(db *postgres.DB, bc core.BlockChain) EventWatcher { extractor := &logs.LogExtractor{ - Fetcher: fetcher.NewLogFetcher(bc), CheckedHeadersRepository: repositories.NewCheckedHeadersRepository(db), + CheckedLogsRepository: repositories.NewCheckedLogsRepository(db), + Fetcher: fetcher.NewLogFetcher(bc), LogRepository: repositories.NewHeaderSyncLogRepository(db), Syncer: transactions.NewTransactionsSyncer(db, bc), } @@ -59,13 +60,17 @@ func NewEventWatcher(db *postgres.DB, bc core.BlockChain) EventWatcher { } // Adds transformers to the watcher so that their logs will be extracted and delegated. -func (watcher *EventWatcher) AddTransformers(initializers []transformer.EventTransformerInitializer) { +func (watcher *EventWatcher) AddTransformers(initializers []transformer.EventTransformerInitializer) error { for _, initializer := range initializers { t := initializer(watcher.db) watcher.LogDelegator.AddTransformer(t) - watcher.LogExtractor.AddTransformerConfig(t.GetConfig()) + err := watcher.LogExtractor.AddTransformerConfig(t.GetConfig()) + if err != nil { + return err + } } + return nil } // Extracts and delegates watched log events. diff --git a/pkg/datastore/postgres/repositories/checked_headers_repository.go b/pkg/datastore/postgres/repositories/checked_headers_repository.go index 341cb272..1e33e425 100644 --- a/pkg/datastore/postgres/repositories/checked_headers_repository.go +++ b/pkg/datastore/postgres/repositories/checked_headers_repository.go @@ -41,6 +41,12 @@ func (repo CheckedHeadersRepository) MarkHeaderChecked(headerID int64) error { return err } +// Remove checked_headers rows with block number >= starting block number +func (repo CheckedHeadersRepository) MarkHeadersUnchecked(startingBlockNumber int64) error { + _, err := repo.db.Exec(`DELETE FROM public.checked_headers WHERE header_id IN (SELECT id FROM public.headers WHERE block_number >= $1)`, startingBlockNumber) + return err +} + // Return header_id if not present in checked_headers or its check_count is < passed checkCount func (repo CheckedHeadersRepository) MissingHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) { var result []core.Header diff --git a/pkg/datastore/postgres/repositories/checked_headers_repository_test.go b/pkg/datastore/postgres/repositories/checked_headers_repository_test.go index c9a9284a..5ee5780b 100644 --- a/pkg/datastore/postgres/repositories/checked_headers_repository_test.go +++ b/pkg/datastore/postgres/repositories/checked_headers_repository_test.go @@ -73,6 +73,47 @@ var _ = Describe("Checked Headers repository", func() { }) }) + Describe("MarkHeadersUnchecked", func() { + It("removes rows for headers <= starting block number", func() { + blockNumberOne := rand.Int63() + blockNumberTwo := blockNumberOne + 1 + blockNumberThree := blockNumberOne + 2 + fakeHeaderOne := fakes.GetFakeHeader(blockNumberOne) + fakeHeaderTwo := fakes.GetFakeHeader(blockNumberTwo) + fakeHeaderThree := fakes.GetFakeHeader(blockNumberThree) + headerRepository := repositories.NewHeaderRepository(db) + // insert three headers with incrementing block number + headerIdOne, insertHeaderOneErr := headerRepository.CreateOrUpdateHeader(fakeHeaderOne) + Expect(insertHeaderOneErr).NotTo(HaveOccurred()) + headerIdTwo, insertHeaderTwoErr := headerRepository.CreateOrUpdateHeader(fakeHeaderTwo) + Expect(insertHeaderTwoErr).NotTo(HaveOccurred()) + headerIdThree, insertHeaderThreeErr := headerRepository.CreateOrUpdateHeader(fakeHeaderThree) + Expect(insertHeaderThreeErr).NotTo(HaveOccurred()) + // mark all headers checked + markHeaderOneCheckedErr := repo.MarkHeaderChecked(headerIdOne) + Expect(markHeaderOneCheckedErr).NotTo(HaveOccurred()) + markHeaderTwoCheckedErr := repo.MarkHeaderChecked(headerIdTwo) + Expect(markHeaderTwoCheckedErr).NotTo(HaveOccurred()) + markHeaderThreeCheckedErr := repo.MarkHeaderChecked(headerIdThree) + Expect(markHeaderThreeCheckedErr).NotTo(HaveOccurred()) + + // mark headers unchecked since blockNumberTwo + err := repo.MarkHeadersUnchecked(blockNumberTwo) + + Expect(err).NotTo(HaveOccurred()) + var headerOneChecked, headerTwoChecked, headerThreeChecked bool + getHeaderOneErr := db.Get(&headerOneChecked, `SELECT EXISTS(SELECT 1 FROM public.checked_headers WHERE header_id = $1)`, headerIdOne) + Expect(getHeaderOneErr).NotTo(HaveOccurred()) + Expect(headerOneChecked).To(BeTrue()) + getHeaderTwoErr := db.Get(&headerTwoChecked, `SELECT EXISTS(SELECT 1 FROM public.checked_headers WHERE header_id = $1)`, headerIdTwo) + Expect(getHeaderTwoErr).NotTo(HaveOccurred()) + Expect(headerTwoChecked).To(BeFalse()) + getHeaderThreeErr := db.Get(&headerThreeChecked, `SELECT EXISTS(SELECT 1 FROM public.checked_headers WHERE header_id = $1)`, headerIdThree) + Expect(getHeaderThreeErr).NotTo(HaveOccurred()) + Expect(headerThreeChecked).To(BeFalse()) + }) + }) + Describe("MissingHeaders", func() { var ( headerRepository datastore.HeaderRepository diff --git a/pkg/datastore/postgres/repositories/checked_logs_repository.go b/pkg/datastore/postgres/repositories/checked_logs_repository.go new file mode 100644 index 00000000..113be3ed --- /dev/null +++ b/pkg/datastore/postgres/repositories/checked_logs_repository.go @@ -0,0 +1,69 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package repositories + +import ( + "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) + +type CheckedLogsRepository struct { + db *postgres.DB +} + +func NewCheckedLogsRepository(db *postgres.DB) CheckedLogsRepository { + return CheckedLogsRepository{db: db} +} + +// Return whether a given address + topic0 has been fetched on a previous run of vDB +func (repository CheckedLogsRepository) HaveLogsBeenChecked(addresses []string, topic0 string) (bool, error) { + for _, address := range addresses { + var addressExists bool + getAddressExistsErr := repository.db.Get(&addressExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE contract_address = $1)`, address) + if getAddressExistsErr != nil { + return false, getAddressExistsErr + } + if !addressExists { + return false, nil + } + } + var topicZeroExists bool + getTopicZeroExistsErr := repository.db.Get(&topicZeroExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE topic_zero = $1)`, topic0) + if getTopicZeroExistsErr != nil { + return false, getTopicZeroExistsErr + } + return topicZeroExists, nil +} + +// Persist that a given address + topic0 has is being fetched on this run of vDB +func (repository CheckedLogsRepository) MarkLogsChecked(addresses []string, topic0 string) error { + tx, txErr := repository.db.Beginx() + if txErr != nil { + return txErr + } + for _, address := range addresses { + _, insertErr := tx.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, address, topic0) + if insertErr != nil { + rollbackErr := tx.Rollback() + if rollbackErr != nil { + logrus.Errorf("error rolling back transaction inserting checked logs: %s", rollbackErr.Error()) + } + return insertErr + } + } + return tx.Commit() +} diff --git a/pkg/datastore/postgres/repositories/checked_logs_repository_test.go b/pkg/datastore/postgres/repositories/checked_logs_repository_test.go new file mode 100644 index 00000000..351bbcaf --- /dev/null +++ b/pkg/datastore/postgres/repositories/checked_logs_repository_test.go @@ -0,0 +1,110 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package repositories_test + +import ( + "github.com/ethereum/go-ethereum/common" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/datastore" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/test_config" +) + +var _ = Describe("Checked logs repository", func() { + var ( + db *postgres.DB + fakeAddress = fakes.FakeAddress.Hex() + fakeAddresses = []string{fakeAddress} + fakeTopicZero = fakes.FakeHash.Hex() + repository datastore.CheckedLogsRepository + ) + + BeforeEach(func() { + db = test_config.NewTestDB(test_config.NewTestNode()) + test_config.CleanTestDB(db) + repository = repositories.NewCheckedLogsRepository(db) + }) + + Describe("HaveLogsBeenChecked", func() { + It("returns true if all addresses and the topic0 are already present in the db", func() { + _, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero) + Expect(insertErr).NotTo(HaveOccurred()) + + hasBeenChecked, err := repository.HaveLogsBeenChecked(fakeAddresses, fakeTopicZero) + + Expect(err).NotTo(HaveOccurred()) + Expect(hasBeenChecked).To(BeTrue()) + }) + + It("returns true if addresses and topic0 were fetched because of a combination of other transformers", func() { + anotherFakeAddress := common.HexToAddress("0x" + fakes.RandomString(40)).Hex() + anotherFakeTopicZero := common.HexToHash("0x" + fakes.RandomString(64)).Hex() + // insert row with matching address but different topic0 + _, insertOneErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, anotherFakeTopicZero) + Expect(insertOneErr).NotTo(HaveOccurred()) + // insert row with matching topic0 but different address + _, insertTwoErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, anotherFakeAddress, fakeTopicZero) + Expect(insertTwoErr).NotTo(HaveOccurred()) + + hasBeenChecked, err := repository.HaveLogsBeenChecked(fakeAddresses, fakeTopicZero) + + Expect(err).NotTo(HaveOccurred()) + Expect(hasBeenChecked).To(BeTrue()) + }) + + It("returns false if any address has not been checked", func() { + anotherFakeAddress := common.HexToAddress("0x" + fakes.RandomString(40)).Hex() + _, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero) + Expect(insertErr).NotTo(HaveOccurred()) + + hasBeenChecked, err := repository.HaveLogsBeenChecked(append(fakeAddresses, anotherFakeAddress), fakeTopicZero) + + Expect(err).NotTo(HaveOccurred()) + Expect(hasBeenChecked).To(BeFalse()) + }) + + It("returns false if topic0 has not been checked", func() { + anotherFakeTopicZero := common.HexToHash("0x" + fakes.RandomString(64)).Hex() + _, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, anotherFakeTopicZero) + Expect(insertErr).NotTo(HaveOccurred()) + + hasBeenChecked, err := repository.HaveLogsBeenChecked(fakeAddresses, fakeTopicZero) + + Expect(err).NotTo(HaveOccurred()) + Expect(hasBeenChecked).To(BeFalse()) + }) + }) + + Describe("MarkLogsChecked", func() { + It("adds a row for all of transformer's addresses + topic0", func() { + anotherFakeAddress := common.HexToAddress("0x" + fakes.RandomString(40)).Hex() + err := repository.MarkLogsChecked(append(fakeAddresses, anotherFakeAddress), fakeTopicZero) + + Expect(err).NotTo(HaveOccurred()) + var comboOneExists, comboTwoExists bool + getComboOneErr := db.Get(&comboOneExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE contract_address = $1 AND topic_zero = $2)`, fakeAddress, fakeTopicZero) + Expect(getComboOneErr).NotTo(HaveOccurred()) + Expect(comboOneExists).To(BeTrue()) + getComboTwoErr := db.Get(&comboTwoExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE contract_address = $1 AND topic_zero = $2)`, anotherFakeAddress, fakeTopicZero) + Expect(getComboTwoErr).NotTo(HaveOccurred()) + Expect(comboTwoExists).To(BeTrue()) + }) + }) +}) diff --git a/pkg/datastore/repository.go b/pkg/datastore/repository.go index 2fbeaf9e..f762bb12 100644 --- a/pkg/datastore/repository.go +++ b/pkg/datastore/repository.go @@ -36,9 +36,15 @@ type BlockRepository interface { type CheckedHeadersRepository interface { MarkHeaderChecked(headerID int64) error + MarkHeadersUnchecked(startingBlockNumber int64) error MissingHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) } +type CheckedLogsRepository interface { + HaveLogsBeenChecked(addresses []string, topic0 string) (bool, error) + MarkLogsChecked(addresses []string, topic0 string) error +} + type ContractRepository interface { CreateContract(contract core.Contract) error GetContract(contractHash string) (core.Contract, error) diff --git a/pkg/fakes/checked_logs_repository.go b/pkg/fakes/checked_logs_repository.go new file mode 100644 index 00000000..fca57e96 --- /dev/null +++ b/pkg/fakes/checked_logs_repository.go @@ -0,0 +1,39 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package fakes + +type MockCheckedLogsRepository struct { + HasLogBeenCheckedAddresses []string + HasLogBeenCheckedError error + HasLogBeenCheckedReturn bool + HasLogBeenCheckedTopicZero string + MarkLogCheckedAddresses []string + MarkLogCheckedError error + MarkLogCheckedTopicZero string +} + +func (repository *MockCheckedLogsRepository) HaveLogsBeenChecked(addresses []string, topic0 string) (bool, error) { + repository.HasLogBeenCheckedAddresses = addresses + repository.HasLogBeenCheckedTopicZero = topic0 + return repository.HasLogBeenCheckedReturn, repository.HasLogBeenCheckedError +} + +func (repository *MockCheckedLogsRepository) MarkLogsChecked(addresses []string, topic0 string) error { + repository.MarkLogCheckedAddresses = addresses + repository.MarkLogCheckedTopicZero = topic0 + return repository.MarkLogCheckedError +} diff --git a/pkg/fakes/data.go b/pkg/fakes/data.go index cebcfb2a..3c6b7046 100644 --- a/pkg/fakes/data.go +++ b/pkg/fakes/data.go @@ -20,16 +20,16 @@ import ( "bytes" "encoding/json" "errors" - "strconv" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/vulcanize/vulcanizedb/pkg/core" + "math/rand" + "strconv" + "time" ) var ( - FakeAddress = common.HexToAddress("0x1234567890abcdef") + FakeAddress = common.HexToAddress("0x" + RandomString(40)) FakeError = errors.New("failed") FakeHash = common.BytesToHash([]byte{1, 2, 3, 4, 5}) fakeTimestamp = int64(111111111) @@ -103,3 +103,15 @@ func GetFakeUncle(hash, reward string) core.Uncle { Timestamp: strconv.FormatInt(fakeTimestamp, 10), } } + +func RandomString(length int) string { + var seededRand = rand.New( + rand.NewSource(time.Now().UnixNano())) + charset := "abcdef1234567890" + b := make([]byte, length) + for i := range b { + b[i] = charset[seededRand.Intn(len(charset))] + } + + return string(b) +} diff --git a/pkg/fakes/mock_checked_headers_repository.go b/pkg/fakes/mock_checked_headers_repository.go index 4e574b07..687bf160 100644 --- a/pkg/fakes/mock_checked_headers_repository.go +++ b/pkg/fakes/mock_checked_headers_repository.go @@ -21,23 +21,32 @@ import ( ) type MockCheckedHeadersRepository struct { - CheckCount int64 - StartingBlockNumber int64 - EndingBlockNumber int64 - HeaderID int64 - ReturnHeaders []core.Header - MarkHeaderCheckedReturnError error - MissingHeadersReturnError error + MarkHeaderCheckedHeaderID int64 + MarkHeaderCheckedReturnError error + MarkHeadersUncheckedCalled bool + MarkHeadersUncheckedReturnError error + MarkHeadersUncheckedStartingBlockNumber int64 + MissingHeadersCheckCount int64 + MissingHeadersEndingBlockNumber int64 + MissingHeadersReturnError error + MissingHeadersReturnHeaders []core.Header + MissingHeadersStartingBlockNumber int64 +} + +func (repository *MockCheckedHeadersRepository) MarkHeadersUnchecked(startingBlockNumber int64) error { + repository.MarkHeadersUncheckedCalled = true + repository.MarkHeadersUncheckedStartingBlockNumber = startingBlockNumber + return repository.MarkHeadersUncheckedReturnError } func (repository *MockCheckedHeadersRepository) MarkHeaderChecked(headerID int64) error { - repository.HeaderID = headerID + repository.MarkHeaderCheckedHeaderID = headerID return repository.MarkHeaderCheckedReturnError } func (repository *MockCheckedHeadersRepository) MissingHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) { - repository.StartingBlockNumber = startingBlockNumber - repository.EndingBlockNumber = endingBlockNumber - repository.CheckCount = checkCount - return repository.ReturnHeaders, repository.MissingHeadersReturnError + repository.MissingHeadersStartingBlockNumber = startingBlockNumber + repository.MissingHeadersEndingBlockNumber = endingBlockNumber + repository.MissingHeadersCheckCount = checkCount + return repository.MissingHeadersReturnHeaders, repository.MissingHeadersReturnError } diff --git a/test_config/test_config.go b/test_config/test_config.go index f9fb3975..f3dad64e 100644 --- a/test_config/test_config.go +++ b/test_config/test_config.go @@ -106,6 +106,7 @@ func CleanTestDB(db *postgres.DB) { db.MustExec("DELETE FROM addresses") db.MustExec("DELETE FROM blocks") db.MustExec("DELETE FROM checked_headers") + db.MustExec("DELETE FROM checked_logs") // can't delete from eth_nodes since this function is called after the required eth_node is persisted db.MustExec("DELETE FROM full_sync_logs") db.MustExec("DELETE FROM full_sync_receipts")