Update checked headers for new transformers

- If a header was marked as checked before a transformer was added to
  the watcher, mark all headers since the new transformer's starting
  block number as unchecked.
This commit is contained in:
Rob Mulholand 2019-08-26 16:43:14 -05:00
parent d76be4962b
commit 666ea1c325
19 changed files with 538 additions and 49 deletions

View File

@ -170,7 +170,10 @@ func composeAndExecute() {
var wg syn.WaitGroup
if len(ethEventInitializers) > 0 {
ew := watcher.NewEventWatcher(&db, blockChain)
ew.AddTransformers(ethEventInitializers)
err := ew.AddTransformers(ethEventInitializers)
if err != nil {
LogWithCommand.Fatalf("failed to add event transformer initializers to watcher: %s", err.Error())
}
wg.Add(1)
go watchEthEvents(&ew, &wg)
}

View File

@ -114,7 +114,10 @@ func execute() {
var wg syn.WaitGroup
if len(ethEventInitializers) > 0 {
ew := watcher.NewEventWatcher(&db, blockChain)
ew.AddTransformers(ethEventInitializers)
err = ew.AddTransformers(ethEventInitializers)
if err != nil {
LogWithCommand.Fatalf("failed to add event transformer initializers to watcher: %s", err.Error())
}
wg.Add(1)
go watchEthEvents(&ew, &wg)
}

View File

@ -0,0 +1,12 @@
-- +goose Up
-- SQL in this section is executed when the migration is applied.
CREATE TABLE public.checked_logs
(
id SERIAL PRIMARY KEY,
contract_address VARCHAR(42),
topic_zero VARCHAR(66)
);
-- +goose Down
-- SQL in this section is executed when the migration is rolled back.
DROP TABLE public.checked_logs;

View File

@ -156,6 +156,37 @@ CREATE SEQUENCE public.checked_headers_id_seq
ALTER SEQUENCE public.checked_headers_id_seq OWNED BY public.checked_headers.id;
--
-- Name: checked_logs; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public.checked_logs (
id integer NOT NULL,
contract_address character varying(42),
topic_zero character varying(66)
);
--
-- Name: checked_logs_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE public.checked_logs_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: checked_logs_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE public.checked_logs_id_seq OWNED BY public.checked_logs.id;
--
-- Name: eth_nodes; Type: TABLE; Schema: public; Owner: -
--
@ -656,6 +687,13 @@ ALTER TABLE ONLY public.blocks ALTER COLUMN id SET DEFAULT nextval('public.block
ALTER TABLE ONLY public.checked_headers ALTER COLUMN id SET DEFAULT nextval('public.checked_headers_id_seq'::regclass);
--
-- Name: checked_logs id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.checked_logs ALTER COLUMN id SET DEFAULT nextval('public.checked_logs_id_seq'::regclass);
--
-- Name: eth_nodes id; Type: DEFAULT; Schema: public; Owner: -
--
@ -787,6 +825,14 @@ ALTER TABLE ONLY public.checked_headers
ADD CONSTRAINT checked_headers_pkey PRIMARY KEY (id);
--
-- Name: checked_logs checked_logs_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.checked_logs
ADD CONSTRAINT checked_logs_pkey PRIMARY KEY (id);
--
-- Name: blocks eth_node_id_block_number_uc; Type: CONSTRAINT; Schema: public; Owner: -
--

16
go.mod
View File

@ -6,28 +6,44 @@ require (
github.com/allegro/bigcache v1.2.1 // indirect
github.com/aristanetworks/goarista v0.0.0-20190712234253-ed1100a1c015 // indirect
github.com/dave/jennifer v1.3.0
github.com/deckarep/golang-set v1.7.1 // indirect
github.com/edsrzf/mmap-go v1.0.0 // indirect
github.com/elastic/gosigar v0.10.4 // indirect
github.com/ethereum/go-ethereum v1.9.1
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect
github.com/gorilla/websocket v1.4.1 // indirect
github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6 // indirect
github.com/hashicorp/golang-lru v0.5.1
github.com/hpcloud/tail v1.0.0
github.com/huin/goupnp v1.0.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.1 // indirect
github.com/jmoiron/sqlx v0.0.0-20181024163419-82935fac6c1a
github.com/karalabe/usb v0.0.0-20190819132248-550797b1cad8 // indirect
github.com/lib/pq v1.0.0
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/mattn/go-isatty v0.0.9 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/mitchellh/go-homedir v1.1.0
github.com/olekukonko/tablewriter v0.0.1 // indirect
github.com/onsi/ginkgo v1.7.0
github.com/onsi/gomega v1.4.3
github.com/pborman/uuid v1.2.0 // indirect
github.com/pressly/goose v2.6.0+incompatible
github.com/prometheus/tsdb v0.10.0 // indirect
github.com/rjeczalik/notify v0.9.2 // indirect
github.com/rs/cors v1.7.0 // indirect
github.com/sirupsen/logrus v1.2.0
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.2
github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48 // indirect
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 // indirect
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/tyler-smith/go-bip39 v1.0.2 // indirect
github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 // indirect
golang.org/x/net v0.0.0-20190603091049-60506f45cf65
golang.org/x/sync v0.0.0-20190423024810-112230192c58
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190709231704-1e4459ed25ff // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7
)

9
go.sum
View File

@ -60,6 +60,7 @@ github.com/go-kit/kit v0.8.0 h1:Wz+5lgoB0kkuqLEc6NVmwRknTKP6dTGbSqvhZtBI/j0=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0 h1:8HUsc87TaSWLKwrnumgC8/YconD2fJQsRJAsWaPg2ic=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-sql-driver/mysql v1.4.0 h1:7LxgVwFb2hIQtMm87NdgAVfXjnt4OePseqT1tKx+opk=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
@ -67,9 +68,11 @@ github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
@ -78,6 +81,8 @@ github.com/google/uuid v1.1.0 h1:Jf4mxPC/ziBnoPIdpQdPJ9OeiomAUHLvxmPRSPH9m4s=
github.com/google/uuid v1.1.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/graph-gophers/graphql-go v0.0.0-20190610161739-8f92f34fc598/go.mod h1:Au3iQ8DvDis8hZ4q2OzRcaKYlAsPt+fYvib5q4nIqu4=
github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6 h1:9WiNlI9Cds5S5YITwRpRs8edNaq0nxTEymhDW20A1QE=
github.com/graph-gophers/graphql-go v0.0.0-20190724201507-010347b5f9e6/go.mod h1:Au3iQ8DvDis8hZ4q2OzRcaKYlAsPt+fYvib5q4nIqu4=
@ -132,6 +137,7 @@ github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-sqlite3 v1.9.0 h1:pDRiWfl+++eC2FEFRy6jXmQlvp4Yh3z1MJKg4UeYM/4=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.11.0 h1:LDdKkqtYlom37fkvqs8rMPFKAMe8+SgjbwZ6ex1/A/Q=
github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
@ -166,6 +172,7 @@ github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -226,6 +233,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/tyler-smith/go-bip39 v1.0.0/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=
github.com/tyler-smith/go-bip39 v1.0.2 h1:+t3w+KwLXO6154GNJY+qUtIxLTmFjfUmpguQT1OlOT8=
github.com/tyler-smith/go-bip39 v1.0.2/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=

View File

@ -36,13 +36,14 @@ const (
)
type ILogExtractor interface {
AddTransformerConfig(config transformer.EventTransformerConfig)
AddTransformerConfig(config transformer.EventTransformerConfig) error
ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool)
}
type LogExtractor struct {
Addresses []common.Address
CheckedHeadersRepository datastore.CheckedHeadersRepository
CheckedLogsRepository datastore.CheckedLogsRepository
Fetcher fetcher.ILogFetcher
LogRepository datastore.HeaderSyncLogRepository
StartingBlock *int64
@ -51,7 +52,12 @@ type LogExtractor struct {
}
// Add additional logs to extract
func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) {
func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) error {
checkedHeadersErr := extractor.updateCheckedHeaders(config)
if checkedHeadersErr != nil {
return checkedHeadersErr
}
if extractor.StartingBlock == nil {
extractor.StartingBlock = &config.StartingBlockNumber
} else if earlierStartingBlockNumber(config.StartingBlockNumber, *extractor.StartingBlock) {
@ -61,6 +67,7 @@ func (extractor *LogExtractor) AddTransformerConfig(config transformer.EventTran
addresses := transformer.HexStringsToAddresses(config.ContractAddresses)
extractor.Addresses = append(extractor.Addresses, addresses...)
extractor.Topics = append(extractor.Topics, common.HexToHash(config.Topic))
return nil
}
// Fetch and persist watched logs
@ -129,3 +136,21 @@ func getCheckCount(recheckHeaders constants.TransformerExecution) int64 {
return constants.RecheckHeaderCap
}
}
func (extractor *LogExtractor) updateCheckedHeaders(config transformer.EventTransformerConfig) error {
hasBeenChecked, hasBeenCheckedErr := extractor.CheckedLogsRepository.HaveLogsBeenChecked(config.ContractAddresses, config.Topic)
if hasBeenCheckedErr != nil {
return hasBeenCheckedErr
}
if !hasBeenChecked {
err := extractor.CheckedHeadersRepository.MarkHeadersUnchecked(config.StartingBlockNumber)
if err != nil {
return err
}
nextErr := extractor.CheckedLogsRepository.MarkLogsChecked(config.ContractAddresses, config.Topic)
if nextErr != nil {
return nextErr
}
}
return nil
}

View File

@ -31,42 +31,52 @@ import (
)
var _ = Describe("Log extractor", func() {
var extractor *logs.LogExtractor
var (
checkedHeadersRepository *fakes.MockCheckedHeadersRepository
checkedLogsRepository *fakes.MockCheckedLogsRepository
extractor *logs.LogExtractor
)
BeforeEach(func() {
checkedHeadersRepository = &fakes.MockCheckedHeadersRepository{}
checkedLogsRepository = &fakes.MockCheckedLogsRepository{}
extractor = &logs.LogExtractor{
CheckedHeadersRepository: checkedHeadersRepository,
CheckedLogsRepository: checkedLogsRepository,
Fetcher: &mocks.MockLogFetcher{},
CheckedHeadersRepository: &fakes.MockCheckedHeadersRepository{},
LogRepository: &fakes.MockHeaderSyncLogRepository{},
Syncer: &fakes.MockTransactionSyncer{},
}
})
Describe("AddTransformerConfig", func() {
It("it includes earliest starting block number in fetch logs query", func() {
It("updates extractor's starting block number to earliest available", func() {
earlierStartingBlockNumber := rand.Int63()
laterStartingBlockNumber := earlierStartingBlockNumber + 1
extractor.AddTransformerConfig(getTransformerConfig(laterStartingBlockNumber))
extractor.AddTransformerConfig(getTransformerConfig(earlierStartingBlockNumber))
errOne := extractor.AddTransformerConfig(getTransformerConfig(laterStartingBlockNumber))
Expect(errOne).NotTo(HaveOccurred())
errTwo := extractor.AddTransformerConfig(getTransformerConfig(earlierStartingBlockNumber))
Expect(errTwo).NotTo(HaveOccurred())
Expect(*extractor.StartingBlock).To(Equal(earlierStartingBlockNumber))
})
It("includes added addresses in fetch logs query", func() {
It("adds transformer's addresses to extractor's watched addresses", func() {
addresses := []string{"0xA", "0xB"}
configWithAddresses := transformer.EventTransformerConfig{
ContractAddresses: addresses,
StartingBlockNumber: rand.Int63(),
}
extractor.AddTransformerConfig(configWithAddresses)
err := extractor.AddTransformerConfig(configWithAddresses)
Expect(err).NotTo(HaveOccurred())
expectedAddresses := transformer.HexStringsToAddresses(addresses)
Expect(extractor.Addresses).To(Equal(expectedAddresses))
})
It("includes added topics in fetch logs query", func() {
It("adds transformer's topic to extractor's watched topics", func() {
topic := "0x1"
configWithTopic := transformer.EventTransformerConfig{
ContractAddresses: []string{fakes.FakeAddress.Hex()},
@ -74,10 +84,75 @@ var _ = Describe("Log extractor", func() {
StartingBlockNumber: rand.Int63(),
}
extractor.AddTransformerConfig(configWithTopic)
err := extractor.AddTransformerConfig(configWithTopic)
Expect(err).NotTo(HaveOccurred())
Expect(extractor.Topics).To(Equal([]common.Hash{common.HexToHash(topic)}))
})
It("returns error if checking whether log has been checked returns error", func() {
checkedLogsRepository.HasLogBeenCheckedError = fakes.FakeError
err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63()))
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
Describe("when log has previously been checked", func() {
It("does not mark any headers unchecked", func() {
checkedLogsRepository.HasLogBeenCheckedReturn = true
err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63()))
Expect(err).NotTo(HaveOccurred())
Expect(checkedHeadersRepository.MarkHeadersUncheckedCalled).To(BeFalse())
})
})
Describe("when log has not previously been checked", func() {
BeforeEach(func() {
checkedLogsRepository.HasLogBeenCheckedReturn = false
})
It("marks headers since transformer's starting block number as unchecked", func() {
blockNumber := rand.Int63()
err := extractor.AddTransformerConfig(getTransformerConfig(blockNumber))
Expect(err).NotTo(HaveOccurred())
Expect(checkedHeadersRepository.MarkHeadersUncheckedCalled).To(BeTrue())
Expect(checkedHeadersRepository.MarkHeadersUncheckedStartingBlockNumber).To(Equal(blockNumber))
})
It("returns error if marking headers unchecked returns error", func() {
checkedHeadersRepository.MarkHeadersUncheckedReturnError = fakes.FakeError
err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63()))
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
It("persists that tranformer's log has been checked", func() {
config := getTransformerConfig(rand.Int63())
err := extractor.AddTransformerConfig(config)
Expect(err).NotTo(HaveOccurred())
Expect(checkedLogsRepository.MarkLogCheckedAddresses).To(Equal(config.ContractAddresses))
Expect(checkedLogsRepository.MarkLogCheckedTopicZero).To(Equal(config.Topic))
})
It("returns error if marking logs checked returns error", func() {
checkedLogsRepository.MarkLogCheckedError = fakes.FakeError
err := extractor.AddTransformerConfig(getTransformerConfig(rand.Int63()))
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
})
})
Describe("ExtractLogs", func() {
@ -91,7 +166,7 @@ var _ = Describe("Log extractor", func() {
Describe("when checking missing headers", func() {
It("gets missing headers since configured starting block with check_count < 1", func() {
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}}
mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{}}
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
startingBlockNumber := rand.Int63()
extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber))
@ -99,16 +174,16 @@ var _ = Describe("Log extractor", func() {
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Expect(err).NotTo(HaveOccurred())
Expect(mockCheckedHeadersRepository.StartingBlockNumber).To(Equal(startingBlockNumber))
Expect(mockCheckedHeadersRepository.EndingBlockNumber).To(Equal(int64(-1)))
Expect(mockCheckedHeadersRepository.CheckCount).To(Equal(int64(1)))
Expect(mockCheckedHeadersRepository.MissingHeadersStartingBlockNumber).To(Equal(startingBlockNumber))
Expect(mockCheckedHeadersRepository.MissingHeadersEndingBlockNumber).To(Equal(int64(-1)))
Expect(mockCheckedHeadersRepository.MissingHeadersCheckCount).To(Equal(int64(1)))
})
})
Describe("when rechecking headers", func() {
It("gets missing headers since configured starting block with check_count < RecheckHeaderCap", func() {
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}}
mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{}}
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
startingBlockNumber := rand.Int63()
extractor.AddTransformerConfig(getTransformerConfig(startingBlockNumber))
@ -116,9 +191,9 @@ var _ = Describe("Log extractor", func() {
err, _ := extractor.ExtractLogs(constants.HeaderRecheck)
Expect(err).NotTo(HaveOccurred())
Expect(mockCheckedHeadersRepository.StartingBlockNumber).To(Equal(startingBlockNumber))
Expect(mockCheckedHeadersRepository.EndingBlockNumber).To(Equal(int64(-1)))
Expect(mockCheckedHeadersRepository.CheckCount).To(Equal(constants.RecheckHeaderCap))
Expect(mockCheckedHeadersRepository.MissingHeadersStartingBlockNumber).To(Equal(startingBlockNumber))
Expect(mockCheckedHeadersRepository.MissingHeadersEndingBlockNumber).To(Equal(int64(-1)))
Expect(mockCheckedHeadersRepository.MissingHeadersCheckCount).To(Equal(constants.RecheckHeaderCap))
})
})
@ -274,20 +349,20 @@ var _ = Describe("Log extractor", func() {
addTransformerConfig(extractor)
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
headerID := rand.Int63()
mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: headerID}}
mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{Id: headerID}}
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
err, _ := extractor.ExtractLogs(constants.HeaderMissing)
Expect(err).NotTo(HaveOccurred())
Expect(mockCheckedHeadersRepository.HeaderID).To(Equal(headerID))
Expect(mockCheckedHeadersRepository.MarkHeaderCheckedHeaderID).To(Equal(headerID))
})
It("returns error if marking header checked fails", func() {
addFetchedLog(extractor)
addTransformerConfig(extractor)
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{Id: rand.Int63()}}
mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{Id: rand.Int63()}}
mockCheckedHeadersRepository.MarkHeaderCheckedReturnError = fakes.FakeError
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
@ -321,7 +396,7 @@ func addTransformerConfig(extractor *logs.LogExtractor) {
func addMissingHeader(extractor *logs.LogExtractor) {
mockCheckedHeadersRepository := &fakes.MockCheckedHeadersRepository{}
mockCheckedHeadersRepository.ReturnHeaders = []core.Header{{}}
mockCheckedHeadersRepository.MissingHeadersReturnHeaders = []core.Header{{}}
extractor.CheckedHeadersRepository = mockCheckedHeadersRepository
}

View File

@ -22,14 +22,16 @@ import (
)
type MockLogExtractor struct {
AddedConfigs []transformer.EventTransformerConfig
ExtractLogsCount int
ExtractLogsErrors []error
MissingHeadersExist []bool
AddedConfigs []transformer.EventTransformerConfig
AddTransformerConfigError error
ExtractLogsCount int
ExtractLogsErrors []error
MissingHeadersExist []bool
}
func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) {
func (extractor *MockLogExtractor) AddTransformerConfig(config transformer.EventTransformerConfig) error {
extractor.AddedConfigs = append(extractor.AddedConfigs, config)
return extractor.AddTransformerConfigError
}
func (extractor *MockLogExtractor) ExtractLogs(recheckHeaders constants.TransformerExecution) (error, bool) {

View File

@ -41,8 +41,9 @@ type EventWatcher struct {
func NewEventWatcher(db *postgres.DB, bc core.BlockChain) EventWatcher {
extractor := &logs.LogExtractor{
Fetcher: fetcher.NewLogFetcher(bc),
CheckedHeadersRepository: repositories.NewCheckedHeadersRepository(db),
CheckedLogsRepository: repositories.NewCheckedLogsRepository(db),
Fetcher: fetcher.NewLogFetcher(bc),
LogRepository: repositories.NewHeaderSyncLogRepository(db),
Syncer: transactions.NewTransactionsSyncer(db, bc),
}
@ -59,13 +60,17 @@ func NewEventWatcher(db *postgres.DB, bc core.BlockChain) EventWatcher {
}
// Adds transformers to the watcher so that their logs will be extracted and delegated.
func (watcher *EventWatcher) AddTransformers(initializers []transformer.EventTransformerInitializer) {
func (watcher *EventWatcher) AddTransformers(initializers []transformer.EventTransformerInitializer) error {
for _, initializer := range initializers {
t := initializer(watcher.db)
watcher.LogDelegator.AddTransformer(t)
watcher.LogExtractor.AddTransformerConfig(t.GetConfig())
err := watcher.LogExtractor.AddTransformerConfig(t.GetConfig())
if err != nil {
return err
}
}
return nil
}
// Extracts and delegates watched log events.

View File

@ -41,6 +41,12 @@ func (repo CheckedHeadersRepository) MarkHeaderChecked(headerID int64) error {
return err
}
// Remove checked_headers rows with block number >= starting block number
func (repo CheckedHeadersRepository) MarkHeadersUnchecked(startingBlockNumber int64) error {
_, err := repo.db.Exec(`DELETE FROM public.checked_headers WHERE header_id IN (SELECT id FROM public.headers WHERE block_number >= $1)`, startingBlockNumber)
return err
}
// Return header_id if not present in checked_headers or its check_count is < passed checkCount
func (repo CheckedHeadersRepository) MissingHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) {
var result []core.Header

View File

@ -73,6 +73,47 @@ var _ = Describe("Checked Headers repository", func() {
})
})
Describe("MarkHeadersUnchecked", func() {
It("removes rows for headers <= starting block number", func() {
blockNumberOne := rand.Int63()
blockNumberTwo := blockNumberOne + 1
blockNumberThree := blockNumberOne + 2
fakeHeaderOne := fakes.GetFakeHeader(blockNumberOne)
fakeHeaderTwo := fakes.GetFakeHeader(blockNumberTwo)
fakeHeaderThree := fakes.GetFakeHeader(blockNumberThree)
headerRepository := repositories.NewHeaderRepository(db)
// insert three headers with incrementing block number
headerIdOne, insertHeaderOneErr := headerRepository.CreateOrUpdateHeader(fakeHeaderOne)
Expect(insertHeaderOneErr).NotTo(HaveOccurred())
headerIdTwo, insertHeaderTwoErr := headerRepository.CreateOrUpdateHeader(fakeHeaderTwo)
Expect(insertHeaderTwoErr).NotTo(HaveOccurred())
headerIdThree, insertHeaderThreeErr := headerRepository.CreateOrUpdateHeader(fakeHeaderThree)
Expect(insertHeaderThreeErr).NotTo(HaveOccurred())
// mark all headers checked
markHeaderOneCheckedErr := repo.MarkHeaderChecked(headerIdOne)
Expect(markHeaderOneCheckedErr).NotTo(HaveOccurred())
markHeaderTwoCheckedErr := repo.MarkHeaderChecked(headerIdTwo)
Expect(markHeaderTwoCheckedErr).NotTo(HaveOccurred())
markHeaderThreeCheckedErr := repo.MarkHeaderChecked(headerIdThree)
Expect(markHeaderThreeCheckedErr).NotTo(HaveOccurred())
// mark headers unchecked since blockNumberTwo
err := repo.MarkHeadersUnchecked(blockNumberTwo)
Expect(err).NotTo(HaveOccurred())
var headerOneChecked, headerTwoChecked, headerThreeChecked bool
getHeaderOneErr := db.Get(&headerOneChecked, `SELECT EXISTS(SELECT 1 FROM public.checked_headers WHERE header_id = $1)`, headerIdOne)
Expect(getHeaderOneErr).NotTo(HaveOccurred())
Expect(headerOneChecked).To(BeTrue())
getHeaderTwoErr := db.Get(&headerTwoChecked, `SELECT EXISTS(SELECT 1 FROM public.checked_headers WHERE header_id = $1)`, headerIdTwo)
Expect(getHeaderTwoErr).NotTo(HaveOccurred())
Expect(headerTwoChecked).To(BeFalse())
getHeaderThreeErr := db.Get(&headerThreeChecked, `SELECT EXISTS(SELECT 1 FROM public.checked_headers WHERE header_id = $1)`, headerIdThree)
Expect(getHeaderThreeErr).NotTo(HaveOccurred())
Expect(headerThreeChecked).To(BeFalse())
})
})
Describe("MissingHeaders", func() {
var (
headerRepository datastore.HeaderRepository

View File

@ -0,0 +1,69 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repositories
import (
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
type CheckedLogsRepository struct {
db *postgres.DB
}
func NewCheckedLogsRepository(db *postgres.DB) CheckedLogsRepository {
return CheckedLogsRepository{db: db}
}
// Return whether a given address + topic0 has been fetched on a previous run of vDB
func (repository CheckedLogsRepository) HaveLogsBeenChecked(addresses []string, topic0 string) (bool, error) {
for _, address := range addresses {
var addressExists bool
getAddressExistsErr := repository.db.Get(&addressExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE contract_address = $1)`, address)
if getAddressExistsErr != nil {
return false, getAddressExistsErr
}
if !addressExists {
return false, nil
}
}
var topicZeroExists bool
getTopicZeroExistsErr := repository.db.Get(&topicZeroExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE topic_zero = $1)`, topic0)
if getTopicZeroExistsErr != nil {
return false, getTopicZeroExistsErr
}
return topicZeroExists, nil
}
// Persist that a given address + topic0 has is being fetched on this run of vDB
func (repository CheckedLogsRepository) MarkLogsChecked(addresses []string, topic0 string) error {
tx, txErr := repository.db.Beginx()
if txErr != nil {
return txErr
}
for _, address := range addresses {
_, insertErr := tx.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, address, topic0)
if insertErr != nil {
rollbackErr := tx.Rollback()
if rollbackErr != nil {
logrus.Errorf("error rolling back transaction inserting checked logs: %s", rollbackErr.Error())
}
return insertErr
}
}
return tx.Commit()
}

View File

@ -0,0 +1,110 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repositories_test
import (
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/datastore"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config"
)
var _ = Describe("Checked logs repository", func() {
var (
db *postgres.DB
fakeAddress = fakes.FakeAddress.Hex()
fakeAddresses = []string{fakeAddress}
fakeTopicZero = fakes.FakeHash.Hex()
repository datastore.CheckedLogsRepository
)
BeforeEach(func() {
db = test_config.NewTestDB(test_config.NewTestNode())
test_config.CleanTestDB(db)
repository = repositories.NewCheckedLogsRepository(db)
})
Describe("HaveLogsBeenChecked", func() {
It("returns true if all addresses and the topic0 are already present in the db", func() {
_, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero)
Expect(insertErr).NotTo(HaveOccurred())
hasBeenChecked, err := repository.HaveLogsBeenChecked(fakeAddresses, fakeTopicZero)
Expect(err).NotTo(HaveOccurred())
Expect(hasBeenChecked).To(BeTrue())
})
It("returns true if addresses and topic0 were fetched because of a combination of other transformers", func() {
anotherFakeAddress := common.HexToAddress("0x" + fakes.RandomString(40)).Hex()
anotherFakeTopicZero := common.HexToHash("0x" + fakes.RandomString(64)).Hex()
// insert row with matching address but different topic0
_, insertOneErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, anotherFakeTopicZero)
Expect(insertOneErr).NotTo(HaveOccurred())
// insert row with matching topic0 but different address
_, insertTwoErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, anotherFakeAddress, fakeTopicZero)
Expect(insertTwoErr).NotTo(HaveOccurred())
hasBeenChecked, err := repository.HaveLogsBeenChecked(fakeAddresses, fakeTopicZero)
Expect(err).NotTo(HaveOccurred())
Expect(hasBeenChecked).To(BeTrue())
})
It("returns false if any address has not been checked", func() {
anotherFakeAddress := common.HexToAddress("0x" + fakes.RandomString(40)).Hex()
_, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, fakeTopicZero)
Expect(insertErr).NotTo(HaveOccurred())
hasBeenChecked, err := repository.HaveLogsBeenChecked(append(fakeAddresses, anotherFakeAddress), fakeTopicZero)
Expect(err).NotTo(HaveOccurred())
Expect(hasBeenChecked).To(BeFalse())
})
It("returns false if topic0 has not been checked", func() {
anotherFakeTopicZero := common.HexToHash("0x" + fakes.RandomString(64)).Hex()
_, insertErr := db.Exec(`INSERT INTO public.checked_logs (contract_address, topic_zero) VALUES ($1, $2)`, fakeAddress, anotherFakeTopicZero)
Expect(insertErr).NotTo(HaveOccurred())
hasBeenChecked, err := repository.HaveLogsBeenChecked(fakeAddresses, fakeTopicZero)
Expect(err).NotTo(HaveOccurred())
Expect(hasBeenChecked).To(BeFalse())
})
})
Describe("MarkLogsChecked", func() {
It("adds a row for all of transformer's addresses + topic0", func() {
anotherFakeAddress := common.HexToAddress("0x" + fakes.RandomString(40)).Hex()
err := repository.MarkLogsChecked(append(fakeAddresses, anotherFakeAddress), fakeTopicZero)
Expect(err).NotTo(HaveOccurred())
var comboOneExists, comboTwoExists bool
getComboOneErr := db.Get(&comboOneExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE contract_address = $1 AND topic_zero = $2)`, fakeAddress, fakeTopicZero)
Expect(getComboOneErr).NotTo(HaveOccurred())
Expect(comboOneExists).To(BeTrue())
getComboTwoErr := db.Get(&comboTwoExists, `SELECT EXISTS(SELECT 1 FROM public.checked_logs WHERE contract_address = $1 AND topic_zero = $2)`, anotherFakeAddress, fakeTopicZero)
Expect(getComboTwoErr).NotTo(HaveOccurred())
Expect(comboTwoExists).To(BeTrue())
})
})
})

View File

@ -36,9 +36,15 @@ type BlockRepository interface {
type CheckedHeadersRepository interface {
MarkHeaderChecked(headerID int64) error
MarkHeadersUnchecked(startingBlockNumber int64) error
MissingHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error)
}
type CheckedLogsRepository interface {
HaveLogsBeenChecked(addresses []string, topic0 string) (bool, error)
MarkLogsChecked(addresses []string, topic0 string) error
}
type ContractRepository interface {
CreateContract(contract core.Contract) error
GetContract(contractHash string) (core.Contract, error)

View File

@ -0,0 +1,39 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package fakes
type MockCheckedLogsRepository struct {
HasLogBeenCheckedAddresses []string
HasLogBeenCheckedError error
HasLogBeenCheckedReturn bool
HasLogBeenCheckedTopicZero string
MarkLogCheckedAddresses []string
MarkLogCheckedError error
MarkLogCheckedTopicZero string
}
func (repository *MockCheckedLogsRepository) HaveLogsBeenChecked(addresses []string, topic0 string) (bool, error) {
repository.HasLogBeenCheckedAddresses = addresses
repository.HasLogBeenCheckedTopicZero = topic0
return repository.HasLogBeenCheckedReturn, repository.HasLogBeenCheckedError
}
func (repository *MockCheckedLogsRepository) MarkLogsChecked(addresses []string, topic0 string) error {
repository.MarkLogCheckedAddresses = addresses
repository.MarkLogCheckedTopicZero = topic0
return repository.MarkLogCheckedError
}

View File

@ -20,16 +20,16 @@ import (
"bytes"
"encoding/json"
"errors"
"strconv"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/core"
"math/rand"
"strconv"
"time"
)
var (
FakeAddress = common.HexToAddress("0x1234567890abcdef")
FakeAddress = common.HexToAddress("0x" + RandomString(40))
FakeError = errors.New("failed")
FakeHash = common.BytesToHash([]byte{1, 2, 3, 4, 5})
fakeTimestamp = int64(111111111)
@ -103,3 +103,15 @@ func GetFakeUncle(hash, reward string) core.Uncle {
Timestamp: strconv.FormatInt(fakeTimestamp, 10),
}
}
func RandomString(length int) string {
var seededRand = rand.New(
rand.NewSource(time.Now().UnixNano()))
charset := "abcdef1234567890"
b := make([]byte, length)
for i := range b {
b[i] = charset[seededRand.Intn(len(charset))]
}
return string(b)
}

View File

@ -21,23 +21,32 @@ import (
)
type MockCheckedHeadersRepository struct {
CheckCount int64
StartingBlockNumber int64
EndingBlockNumber int64
HeaderID int64
ReturnHeaders []core.Header
MarkHeaderCheckedReturnError error
MissingHeadersReturnError error
MarkHeaderCheckedHeaderID int64
MarkHeaderCheckedReturnError error
MarkHeadersUncheckedCalled bool
MarkHeadersUncheckedReturnError error
MarkHeadersUncheckedStartingBlockNumber int64
MissingHeadersCheckCount int64
MissingHeadersEndingBlockNumber int64
MissingHeadersReturnError error
MissingHeadersReturnHeaders []core.Header
MissingHeadersStartingBlockNumber int64
}
func (repository *MockCheckedHeadersRepository) MarkHeadersUnchecked(startingBlockNumber int64) error {
repository.MarkHeadersUncheckedCalled = true
repository.MarkHeadersUncheckedStartingBlockNumber = startingBlockNumber
return repository.MarkHeadersUncheckedReturnError
}
func (repository *MockCheckedHeadersRepository) MarkHeaderChecked(headerID int64) error {
repository.HeaderID = headerID
repository.MarkHeaderCheckedHeaderID = headerID
return repository.MarkHeaderCheckedReturnError
}
func (repository *MockCheckedHeadersRepository) MissingHeaders(startingBlockNumber, endingBlockNumber, checkCount int64) ([]core.Header, error) {
repository.StartingBlockNumber = startingBlockNumber
repository.EndingBlockNumber = endingBlockNumber
repository.CheckCount = checkCount
return repository.ReturnHeaders, repository.MissingHeadersReturnError
repository.MissingHeadersStartingBlockNumber = startingBlockNumber
repository.MissingHeadersEndingBlockNumber = endingBlockNumber
repository.MissingHeadersCheckCount = checkCount
return repository.MissingHeadersReturnHeaders, repository.MissingHeadersReturnError
}

View File

@ -106,6 +106,7 @@ func CleanTestDB(db *postgres.DB) {
db.MustExec("DELETE FROM addresses")
db.MustExec("DELETE FROM blocks")
db.MustExec("DELETE FROM checked_headers")
db.MustExec("DELETE FROM checked_logs")
// can't delete from eth_nodes since this function is called after the required eth_node is persisted
db.MustExec("DELETE FROM full_sync_logs")
db.MustExec("DELETE FROM full_sync_receipts")