diff --git a/cmd/continuousLogSync.go b/cmd/continuousLogSync.go index 9497d30b..55b6106c 100644 --- a/cmd/continuousLogSync.go +++ b/cmd/continuousLogSync.go @@ -101,6 +101,7 @@ func buildTransformerInitializerMap() map[string]shared2.TransformerInitializer transformerInitializerMap["tend"] = transformers.TendTransformerInitializer transformerInitializerMap["vatInit"] = transformers.VatInitTransformerInitializer transformerInitializerMap["vatToll"] = transformers.VatTollTransformerInitializer + transformerInitializerMap["vatTune"] = transformers.VatTuneTransformerInitializer return transformerInitializerMap } diff --git a/db/migrations/1538689084_create_vat_tune.down.sql b/db/migrations/1538689084_create_vat_tune.down.sql new file mode 100644 index 00000000..1d1a34ec --- /dev/null +++ b/db/migrations/1538689084_create_vat_tune.down.sql @@ -0,0 +1,3 @@ +DROP TABLE maker.vat_tune; +ALTER TABLE public.checked_headers + DROP COLUMN vat_tune_checked; \ No newline at end of file diff --git a/db/migrations/1538689084_create_vat_tune.up.sql b/db/migrations/1538689084_create_vat_tune.up.sql new file mode 100644 index 00000000..d308a7bd --- /dev/null +++ b/db/migrations/1538689084_create_vat_tune.up.sql @@ -0,0 +1,16 @@ +CREATE TABLE maker.vat_tune ( + id SERIAL PRIMARY KEY, + header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, + ilk TEXT, + urn TEXT, + v TEXT, + w TEXT, + dink NUMERIC, + dart NUMERIC, + tx_idx INTEGER NOT NULL, + raw_log JSONB, + UNIQUE (header_id, tx_idx) +); + +ALTER TABLE public.checked_headers + ADD COLUMN vat_tune_checked BOOLEAN NOT NULL DEFAULT FALSE; \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index ff8c6e35..7d60455d 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -2,8 +2,8 @@ -- PostgreSQL database dump -- --- Dumped from database version 10.4 --- Dumped by pg_dump version 10.4 +-- Dumped from database version 10.3 +-- Dumped by pg_dump version 10.3 SET statement_timeout = 0; SET lock_timeout = 0; @@ -765,6 +765,44 @@ CREATE SEQUENCE maker.vat_toll_id_seq ALTER SEQUENCE maker.vat_toll_id_seq OWNED BY maker.vat_toll.id; +-- +-- Name: vat_tune; Type: TABLE; Schema: maker; Owner: - +-- + +CREATE TABLE maker.vat_tune ( + id integer NOT NULL, + header_id integer NOT NULL, + ilk text, + urn text, + v text, + w text, + dink numeric, + dart numeric, + tx_idx integer NOT NULL, + raw_log jsonb +); + + +-- +-- Name: vat_tune_id_seq; Type: SEQUENCE; Schema: maker; Owner: - +-- + +CREATE SEQUENCE maker.vat_tune_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: vat_tune_id_seq; Type: SEQUENCE OWNED BY; Schema: maker; Owner: - +-- + +ALTER SEQUENCE maker.vat_tune_id_seq OWNED BY maker.vat_tune.id; + + -- -- Name: logs; Type: TABLE; Schema: public; Owner: - -- @@ -866,7 +904,8 @@ CREATE TABLE public.checked_headers ( pit_file_ilk_checked boolean DEFAULT false NOT NULL, pit_file_stability_fee_checked boolean DEFAULT false NOT NULL, vat_init_checked boolean DEFAULT false NOT NULL, - vat_toll_checked boolean DEFAULT false NOT NULL + vat_toll_checked boolean DEFAULT false NOT NULL, + vat_tune_checked boolean DEFAULT false NOT NULL ); @@ -1327,6 +1366,13 @@ ALTER TABLE ONLY maker.vat_init ALTER COLUMN id SET DEFAULT nextval('maker.vat_i ALTER TABLE ONLY maker.vat_toll ALTER COLUMN id SET DEFAULT nextval('maker.vat_toll_id_seq'::regclass); +-- +-- Name: vat_tune id; Type: DEFAULT; Schema: maker; Owner: - +-- + +ALTER TABLE ONLY maker.vat_tune ALTER COLUMN id SET DEFAULT nextval('maker.vat_tune_id_seq'::regclass); + + -- -- Name: blocks id; Type: DEFAULT; Schema: public; Owner: - -- @@ -1717,6 +1763,22 @@ ALTER TABLE ONLY maker.vat_toll ADD CONSTRAINT vat_toll_pkey PRIMARY KEY (id); +-- +-- Name: vat_tune vat_tune_header_id_tx_idx_key; Type: CONSTRAINT; Schema: maker; Owner: - +-- + +ALTER TABLE ONLY maker.vat_tune + ADD CONSTRAINT vat_tune_header_id_tx_idx_key UNIQUE (header_id, tx_idx); + + +-- +-- Name: vat_tune vat_tune_pkey; Type: CONSTRAINT; Schema: maker; Owner: - +-- + +ALTER TABLE ONLY maker.vat_tune + ADD CONSTRAINT vat_tune_pkey PRIMARY KEY (id); + + -- -- Name: blocks blocks_pkey; Type: CONSTRAINT; Schema: public; Owner: - -- @@ -2031,6 +2093,14 @@ ALTER TABLE ONLY maker.vat_toll ADD CONSTRAINT vat_toll_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE; +-- +-- Name: vat_tune vat_tune_header_id_fkey; Type: FK CONSTRAINT; Schema: maker; Owner: - +-- + +ALTER TABLE ONLY maker.vat_tune + ADD CONSTRAINT vat_tune_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE; + + -- -- Name: transactions blocks_fk; Type: FK CONSTRAINT; Schema: public; Owner: - -- diff --git a/pkg/transformers/shared/constants.go b/pkg/transformers/shared/constants.go index 2e411204..f6232779 100644 --- a/pkg/transformers/shared/constants.go +++ b/pkg/transformers/shared/constants.go @@ -57,6 +57,7 @@ var ( tendMethod = GetSolidityMethodSignature(FlipperABI, "tend") vatInitMethod = GetSolidityMethodSignature(VatABI, "init") vatTollMethod = GetSolidityMethodSignature(VatABI, "toll") + vatTuneMethod = GetSolidityMethodSignature(VatABI, "tune") BiteSignature = GetEventSignature(biteMethod) DealSignature = GetLogNoteSignature(dealMethod) @@ -78,4 +79,5 @@ var ( TendFunctionSignature = GetLogNoteSignature(tendMethod) VatInitSignature = GetLogNoteSignature(vatInitMethod) VatTollSignature = GetLogNoteSignature(vatTollMethod) + VatTuneSignature = GetLogNoteSignature(vatTuneMethod) ) diff --git a/pkg/transformers/shared/utilities.go b/pkg/transformers/shared/utilities.go index 83afd982..91f2ccaf 100644 --- a/pkg/transformers/shared/utilities.go +++ b/pkg/transformers/shared/utilities.go @@ -32,3 +32,15 @@ func BigIntToString(value *big.Int) string { return result } } + +func GetDataBytesAtIndex(n int, logData []byte) []byte { + switch { + case n == -1: + return logData[len(logData)-DataItemLength:] + case n == -2: + return logData[len(logData)-(2*DataItemLength) : len(logData)-DataItemLength] + case n == -3: + return logData[len(logData)-(3*DataItemLength) : len(logData)-(2*DataItemLength)] + } + return []byte{} +} diff --git a/pkg/transformers/shared/utilities_test.go b/pkg/transformers/shared/utilities_test.go new file mode 100644 index 00000000..03e5ddc0 --- /dev/null +++ b/pkg/transformers/shared/utilities_test.go @@ -0,0 +1,49 @@ +package shared_test + +import ( + "github.com/ethereum/go-ethereum/common/hexutil" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/ethereum/go-ethereum/common" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" + "math/big" +) + +var _ = Describe("Shared utilities", func() { + Describe("getting data at index", func() { + It("gets bytes for the last index in log data", func() { + logData := hexutil.MustDecode("0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000c45dd6471a66616b6520696c6b0000000000000000000000000000000000000000000000007d7bee5fcfd8028cf7b00876c5b1421c800561a6000000000000000000000000a3e37186e017747dba34042e83e3f76ad3cce9b00000000000000000000000000f243e26db94b5426032e6dfa6007802dea2a61400000000000000000000000000000000000000000000000000000000000000000000000000000000075bcd15000000000000000000000000000000000000000000000000000000003ade68b1") + bigIntBytes := big.NewInt(987654321).Bytes() + // big.Int.Bytes() does not include zero padding, but bytes in data index are of fixed length and include zero padding + expected := []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + expected = append(expected, bigIntBytes...) + + actual := shared.GetDataBytesAtIndex(-1, logData) + + Expect(expected[:]).To(Equal(actual)) + }) + + It("gets bytes for the second-to-last index in log data", func() { + logData := hexutil.MustDecode("0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000c45dd6471a66616b6520696c6b0000000000000000000000000000000000000000000000007d7bee5fcfd8028cf7b00876c5b1421c800561a6000000000000000000000000a3e37186e017747dba34042e83e3f76ad3cce9b00000000000000000000000000f243e26db94b5426032e6dfa6007802dea2a61400000000000000000000000000000000000000000000000000000000000000000000000000000000075bcd15000000000000000000000000000000000000000000000000000000003ade68b1") + bigIntBytes := big.NewInt(123456789).Bytes() + expected := []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + expected = append(expected, bigIntBytes...) + + actual := shared.GetDataBytesAtIndex(-2, logData) + + Expect(expected[:]).To(Equal(actual)) + }) + + It("gets bytes for the third-to-last index in log data", func() { + logData := hexutil.MustDecode("0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000c45dd6471a66616b6520696c6b0000000000000000000000000000000000000000000000007d7bee5fcfd8028cf7b00876c5b1421c800561a6000000000000000000000000a3e37186e017747dba34042e83e3f76ad3cce9b00000000000000000000000000f243e26db94b5426032e6dfa6007802dea2a61400000000000000000000000000000000000000000000000000000000000000000000000000000000075bcd15000000000000000000000000000000000000000000000000000000003ade68b1") + addressBytes := common.HexToAddress("0x0F243E26db94B5426032E6DFA6007802Dea2a614").Bytes() + // common.address.Bytes() returns [20]byte{}, need [32]byte{} + expected := append(addressBytes, []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}...) + + actual := shared.GetDataBytesAtIndex(-3, logData) + + Expect(expected[:]).To(Equal(actual)) + }) + }) +}) diff --git a/pkg/transformers/test_data/mocks/vat_tune/converter.go b/pkg/transformers/test_data/mocks/vat_tune/converter.go new file mode 100644 index 00000000..5dbef7fd --- /dev/null +++ b/pkg/transformers/test_data/mocks/vat_tune/converter.go @@ -0,0 +1,21 @@ +package vat_tune + +import ( + "github.com/ethereum/go-ethereum/core/types" + "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" + "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_tune" +) + +type MockVatTuneConverter struct { + err error + PassedLogs []types.Log +} + +func (converter *MockVatTuneConverter) ToModels(ethLogs []types.Log) ([]vat_tune.VatTuneModel, error) { + converter.PassedLogs = ethLogs + return []vat_tune.VatTuneModel{test_data.VatTuneModel}, converter.err +} + +func (converter *MockVatTuneConverter) SetConverterError(e error) { + converter.err = e +} diff --git a/pkg/transformers/test_data/mocks/vat_tune/repository.go b/pkg/transformers/test_data/mocks/vat_tune/repository.go new file mode 100644 index 00000000..164d145d --- /dev/null +++ b/pkg/transformers/test_data/mocks/vat_tune/repository.go @@ -0,0 +1,57 @@ +package vat_tune + +import ( + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_tune" +) + +type MockVatTuneRepository struct { + createErr error + markHeaderCheckedErr error + markHeaderCheckedPassedHeaderID int64 + missingHeaders []core.Header + missingHeadersErr error + PassedStartingBlockNumber int64 + PassedEndingBlockNumber int64 + PassedHeaderID int64 + PassedModels []vat_tune.VatTuneModel +} + +func (repository *MockVatTuneRepository) Create(headerID int64, models []vat_tune.VatTuneModel) error { + repository.PassedHeaderID = headerID + repository.PassedModels = models + return repository.createErr +} + +func (repository *MockVatTuneRepository) MarkHeaderChecked(headerID int64) error { + repository.markHeaderCheckedPassedHeaderID = headerID + return repository.markHeaderCheckedErr +} + +func (repository *MockVatTuneRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { + repository.PassedStartingBlockNumber = startingBlockNumber + repository.PassedEndingBlockNumber = endingBlockNumber + return repository.missingHeaders, repository.missingHeadersErr +} + +func (repository *MockVatTuneRepository) SetMissingHeadersErr(e error) { + repository.missingHeadersErr = e +} + +func (repository *MockVatTuneRepository) SetMissingHeaders(headers []core.Header) { + repository.missingHeaders = headers +} + +func (repository *MockVatTuneRepository) AssertMarkHeaderCheckedCalledWith(i int64) { + Expect(repository.markHeaderCheckedPassedHeaderID).To(Equal(i)) +} + +func (repository *MockVatTuneRepository) SetMarkHeaderCheckedErr(e error) { + repository.markHeaderCheckedErr = e +} + +func (repository *MockVatTuneRepository) SetCreateError(e error) { + repository.createErr = e +} diff --git a/pkg/transformers/test_data/vat_tune.go b/pkg/transformers/test_data/vat_tune.go new file mode 100644 index 00000000..0cc6a03e --- /dev/null +++ b/pkg/transformers/test_data/vat_tune.go @@ -0,0 +1,41 @@ +package test_data + +import ( + "encoding/json" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + + "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_tune" +) + +var EthVatTuneLog = types.Log{ + Address: common.HexToAddress("0x239E6f0AB02713f1F8AA90ebeDeD9FC66Dc96CD6"), + Topics: []common.Hash{ + common.HexToHash("0x5dd6471a00000000000000000000000000000000000000000000000000000000"), + common.HexToHash("0x66616b6520696c6b000000000000000000000000000000000000000000000000"), + common.HexToHash("0x7d7bee5fcfd8028cf7b00876c5b1421c800561a6000000000000000000000000"), + common.HexToHash("0xa3e37186e017747dba34042e83e3f76ad3cce9b0000000000000000000000000"), + }, + Data: hexutil.MustDecode("0x000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000c45dd6471a66616b6520696c6b0000000000000000000000000000000000000000000000007d7bee5fcfd8028cf7b00876c5b1421c800561a6000000000000000000000000a3e37186e017747dba34042e83e3f76ad3cce9b00000000000000000000000000f243e26db94b5426032e6dfa6007802dea2a61400000000000000000000000000000000000000000000000000000000000000000000000000000000075bcd15000000000000000000000000000000000000000000000000000000003ade68b1"), + BlockNumber: 22, + TxHash: common.HexToHash("0x2b19ef3965420d2d5eb8792eafbd5e365b2866200cfc4473835971f6965f831c"), + TxIndex: 3, + BlockHash: common.HexToHash("0xf3a4a9ecb79a721421b347424dc9fa072ca762f3ba340557b54a205f058f59a6"), + Index: 6, + Removed: false, +} + +var rawVatTuneLog, _ = json.Marshal(EthVatTuneLog) +var VatTuneModel = vat_tune.VatTuneModel{ + Ilk: "fake ilk", + Urn: "0x7d7bEe5fCfD8028cf7b00876C5b1421c800561A6", + V: "0xA3E37186E017747DbA34042e83e3F76Ad3CcE9b0", + W: "0x0F243E26db94B5426032E6DFA6007802Dea2a614", + Dink: big.NewInt(123456789).String(), + Dart: big.NewInt(987654321).String(), + TransactionIndex: EthVatTuneLog.TxIndex, + Raw: rawVatTuneLog, +} diff --git a/pkg/transformers/transformers.go b/pkg/transformers/transformers.go index 8d823834..1b966f8c 100644 --- a/pkg/transformers/transformers.go +++ b/pkg/transformers/transformers.go @@ -39,6 +39,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/transformers/tend" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_init" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_toll" + "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_tune" ) var ( @@ -65,6 +66,7 @@ var ( TendTransformerInitializer = tend.TendTransformerInitializer{Config: tend.TendConfig}.NewTendTransformer VatInitTransformerInitializer = vat_init.VatInitTransformerInitializer{Config: vat_init.VatInitConfig}.NewVatInitTransformer VatTollTransformerInitializer = vat_toll.VatTollTransformerInitializer{Config: vat_toll.VatTollConfig}.NewVatTollTransformer + VatTuneTransformerInitializer = vat_tune.VatTuneTransformerInitializer{Config: vat_tune.VatTuneConfig}.NewVatTuneTransformer ) func TransformerInitializers() []shared.TransformerInitializer { @@ -89,5 +91,6 @@ func TransformerInitializers() []shared.TransformerInitializer { TendTransformerInitializer, VatInitTransformerInitializer, VatTollTransformerInitializer, + VatTuneTransformerInitializer, } } diff --git a/pkg/transformers/vat_tune/config.go b/pkg/transformers/vat_tune/config.go new file mode 100644 index 00000000..589f63f4 --- /dev/null +++ b/pkg/transformers/vat_tune/config.go @@ -0,0 +1,11 @@ +package vat_tune + +import "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" + +var VatTuneConfig = shared.TransformerConfig{ + ContractAddresses: []string{shared.VatContractAddress}, + ContractAbi: shared.VatABI, + Topics: []string{shared.VatTuneSignature}, + StartingBlockNumber: 0, + EndingBlockNumber: 10000000, +} diff --git a/pkg/transformers/vat_tune/converter.go b/pkg/transformers/vat_tune/converter.go new file mode 100644 index 00000000..b01f2892 --- /dev/null +++ b/pkg/transformers/vat_tune/converter.go @@ -0,0 +1,65 @@ +package vat_tune + +import ( + "bytes" + "encoding/json" + "errors" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + + "github.com/ethereum/go-ethereum/common" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" +) + +type Converter interface { + ToModels(ethLogs []types.Log) ([]VatTuneModel, error) +} + +type VatTuneConverter struct{} + +func (VatTuneConverter) ToModels(ethLogs []types.Log) ([]VatTuneModel, error) { + var models []VatTuneModel + for _, ethLog := range ethLogs { + err := verifyLog(ethLog) + if err != nil { + return nil, err + } + ilk := string(bytes.Trim(ethLog.Topics[1].Bytes(), "\x00")) + urn := common.BytesToAddress(ethLog.Topics[2].Bytes()[:common.AddressLength]) + v := common.BytesToAddress(ethLog.Topics[3].Bytes()[:common.AddressLength]) + wBytes := shared.GetDataBytesAtIndex(-3, ethLog.Data) + w := common.BytesToAddress(wBytes[:common.AddressLength]) + dinkBytes := shared.GetDataBytesAtIndex(-2, ethLog.Data) + dink := big.NewInt(0).SetBytes(dinkBytes).String() + dartBytes := shared.GetDataBytesAtIndex(-1, ethLog.Data) + dart := big.NewInt(0).SetBytes(dartBytes).String() + + raw, err := json.Marshal(ethLog) + if err != nil { + return nil, err + } + model := VatTuneModel{ + Ilk: ilk, + Urn: urn.String(), + V: v.String(), + W: w.String(), + Dink: dink, + Dart: dart, + TransactionIndex: ethLog.TxIndex, + Raw: raw, + } + models = append(models, model) + } + return models, nil +} + +func verifyLog(log types.Log) error { + if len(log.Topics) < 4 { + return errors.New("log missing topics") + } + if len(log.Data) < shared.DataItemLength { + return errors.New("log missing data") + } + return nil +} diff --git a/pkg/transformers/vat_tune/converter_test.go b/pkg/transformers/vat_tune/converter_test.go new file mode 100644 index 00000000..09347aaa --- /dev/null +++ b/pkg/transformers/vat_tune/converter_test.go @@ -0,0 +1,45 @@ +package vat_tune_test + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" + "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_tune" +) + +var _ = Describe("Vat tune converter", func() { + It("returns err if log is missing topics", func() { + converter := vat_tune.VatTuneConverter{} + badLog := types.Log{ + Data: []byte{1, 1, 1, 1, 1}, + } + + _, err := converter.ToModels([]types.Log{badLog}) + + Expect(err).To(HaveOccurred()) + }) + + It("returns err if log is missing data", func() { + converter := vat_tune.VatTuneConverter{} + badLog := types.Log{ + Topics: []common.Hash{{}, {}, {}, {}}, + } + + _, err := converter.ToModels([]types.Log{badLog}) + + Expect(err).To(HaveOccurred()) + }) + + It("converts a log to an model", func() { + converter := vat_tune.VatTuneConverter{} + + models, err := converter.ToModels([]types.Log{test_data.EthVatTuneLog}) + + Expect(err).NotTo(HaveOccurred()) + Expect(len(models)).To(Equal(1)) + Expect(models[0]).To(Equal(test_data.VatTuneModel)) + }) +}) diff --git a/pkg/transformers/vat_tune/model.go b/pkg/transformers/vat_tune/model.go new file mode 100644 index 00000000..a625d1e0 --- /dev/null +++ b/pkg/transformers/vat_tune/model.go @@ -0,0 +1,12 @@ +package vat_tune + +type VatTuneModel struct { + Ilk string + Urn string + V string + W string + Dink string + Dart string + TransactionIndex uint `db:"tx_idx"` + Raw []byte `db:"raw_log"` +} diff --git a/pkg/transformers/vat_tune/repository.go b/pkg/transformers/vat_tune/repository.go new file mode 100644 index 00000000..10db10d5 --- /dev/null +++ b/pkg/transformers/vat_tune/repository.go @@ -0,0 +1,74 @@ +package vat_tune + +import ( + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" +) + +type Repository interface { + Create(headerID int64, models []VatTuneModel) error + MarkHeaderChecked(headerID int64) error + MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) +} + +type VatTuneRepository struct { + db *postgres.DB +} + +func NewVatTuneRepository(db *postgres.DB) VatTuneRepository { + return VatTuneRepository{ + db: db, + } +} + +func (repository VatTuneRepository) Create(headerID int64, models []VatTuneModel) error { + tx, err := repository.db.Begin() + if err != nil { + return err + } + for _, model := range models { + _, err = tx.Exec( + `INSERT into maker.vat_tune (header_id, ilk, urn, v, w, dink, dart, tx_idx, raw_log) + VALUES($1, $2, $3, $4, $5, $6::NUMERIC, $7::NUMERIC, $8, $9)`, + headerID, model.Ilk, model.Urn, model.V, model.W, model.Dink, model.Dart, model.TransactionIndex, model.Raw, + ) + if err != nil { + tx.Rollback() + return err + } + } + _, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, vat_tune_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET vat_tune_checked = $2`, headerID, true) + if err != nil { + tx.Rollback() + return err + } + return tx.Commit() +} + +func (repository VatTuneRepository) MarkHeaderChecked(headerID int64) error { + _, err := repository.db.Exec(`INSERT INTO public.checked_headers (header_id, vat_tune_checked) + VALUES ($1, $2) + ON CONFLICT (header_id) DO + UPDATE SET vat_tune_checked = $2`, headerID, true) + return err +} + +func (repository VatTuneRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) { + var result []core.Header + err := repository.db.Select( + &result, + `SELECT headers.id, headers.block_number FROM headers + LEFT JOIN checked_headers on headers.id = header_id + WHERE (header_id ISNULL OR vat_tune_checked IS FALSE) + AND headers.block_number >= $1 + AND headers.block_number <= $2 + AND headers.eth_node_fingerprint = $3`, + startingBlockNumber, + endingBlockNumber, + repository.db.Node.ID, + ) + return result, err +} diff --git a/pkg/transformers/vat_tune/repository_test.go b/pkg/transformers/vat_tune/repository_test.go new file mode 100644 index 00000000..a0e54ac6 --- /dev/null +++ b/pkg/transformers/vat_tune/repository_test.go @@ -0,0 +1,209 @@ +package vat_tune_test + +import ( + "database/sql" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" + "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" + "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_tune" + "github.com/vulcanize/vulcanizedb/test_config" +) + +var _ = Describe("Vat tune repository", func() { + Describe("Create", func() { + var ( + db *postgres.DB + vatTuneRepository vat_tune.Repository + err error + headerID int64 + ) + + BeforeEach(func() { + db = test_config.NewTestDB(core.Node{}) + test_config.CleanTestDB(db) + headerRepository := repositories.NewHeaderRepository(db) + headerID, err = headerRepository.CreateOrUpdateHeader(core.Header{}) + Expect(err).NotTo(HaveOccurred()) + vatTuneRepository = vat_tune.NewVatTuneRepository(db) + }) + + It("adds a vat tune event", func() { + err = vatTuneRepository.Create(headerID, []vat_tune.VatTuneModel{test_data.VatTuneModel}) + + Expect(err).NotTo(HaveOccurred()) + var dbVatTune vat_tune.VatTuneModel + err = db.Get(&dbVatTune, `SELECT ilk, urn, v, w, dink, dart, tx_idx, raw_log FROM maker.vat_tune WHERE header_id = $1`, headerID) + Expect(err).NotTo(HaveOccurred()) + Expect(dbVatTune.Ilk).To(Equal(test_data.VatTuneModel.Ilk)) + Expect(dbVatTune.Urn).To(Equal(test_data.VatTuneModel.Urn)) + Expect(dbVatTune.V).To(Equal(test_data.VatTuneModel.V)) + Expect(dbVatTune.W).To(Equal(test_data.VatTuneModel.W)) + Expect(dbVatTune.Dink).To(Equal(test_data.VatTuneModel.Dink)) + Expect(dbVatTune.Dart).To(Equal(test_data.VatTuneModel.Dart)) + Expect(dbVatTune.TransactionIndex).To(Equal(test_data.VatTuneModel.TransactionIndex)) + Expect(dbVatTune.Raw).To(MatchJSON(test_data.VatTuneModel.Raw)) + }) + + It("marks header as checked for logs", func() { + err = vatTuneRepository.Create(headerID, []vat_tune.VatTuneModel{test_data.VatTuneModel}) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT vat_tune_checked FROM public.checked_headers WHERE header_id = $1`, headerID) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + + It("does not duplicate pit file vat_tune events", func() { + err = vatTuneRepository.Create(headerID, []vat_tune.VatTuneModel{test_data.VatTuneModel}) + Expect(err).NotTo(HaveOccurred()) + + err = vatTuneRepository.Create(headerID, []vat_tune.VatTuneModel{test_data.VatTuneModel}) + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint")) + }) + + It("removes pit file vat_tune if corresponding header is deleted", func() { + err = vatTuneRepository.Create(headerID, []vat_tune.VatTuneModel{test_data.VatTuneModel}) + Expect(err).NotTo(HaveOccurred()) + + _, err = db.Exec(`DELETE FROM headers WHERE id = $1`, headerID) + + Expect(err).NotTo(HaveOccurred()) + var dbVatTune vat_tune.VatTuneModel + err = db.Get(&dbVatTune, `SELECT ilk, urn, v, w, dink, dart, tx_idx, raw_log FROM maker.vat_tune WHERE header_id = $1`, headerID) + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(sql.ErrNoRows)) + }) + }) + + Describe("MarkHeaderChecked", func() { + var ( + db *postgres.DB + vatTuneRepository vat_tune.Repository + err error + headerID int64 + ) + + BeforeEach(func() { + db = test_config.NewTestDB(core.Node{}) + test_config.CleanTestDB(db) + headerRepository := repositories.NewHeaderRepository(db) + headerID, err = headerRepository.CreateOrUpdateHeader(core.Header{}) + Expect(err).NotTo(HaveOccurred()) + vatTuneRepository = vat_tune.NewVatTuneRepository(db) + }) + + It("creates a row for a new headerID", func() { + err = vatTuneRepository.MarkHeaderChecked(headerID) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT vat_tune_checked FROM public.checked_headers WHERE header_id = $1`, headerID) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + + It("updates row when headerID already exists", func() { + _, err = db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerID) + + err = vatTuneRepository.MarkHeaderChecked(headerID) + + Expect(err).NotTo(HaveOccurred()) + var headerChecked bool + err = db.Get(&headerChecked, `SELECT vat_tune_checked FROM public.checked_headers WHERE header_id = $1`, headerID) + Expect(err).NotTo(HaveOccurred()) + Expect(headerChecked).To(BeTrue()) + }) + }) + + Describe("MissingHeaders", func() { + It("returns headers that haven't been checked", func() { + db := test_config.NewTestDB(core.Node{}) + test_config.CleanTestDB(db) + headerRepository := repositories.NewHeaderRepository(db) + startingBlockNumber := int64(1) + vatTuneBlockNumber := int64(2) + endingBlockNumber := int64(3) + blockNumbers := []int64{startingBlockNumber, vatTuneBlockNumber, endingBlockNumber, endingBlockNumber + 1} + var headerIDs []int64 + for _, n := range blockNumbers { + headerID, err := headerRepository.CreateOrUpdateHeader(core.Header{BlockNumber: n}) + headerIDs = append(headerIDs, headerID) + Expect(err).NotTo(HaveOccurred()) + } + vatTuneRepository := vat_tune.NewVatTuneRepository(db) + err := vatTuneRepository.MarkHeaderChecked(headerIDs[1]) + Expect(err).NotTo(HaveOccurred()) + + headers, err := vatTuneRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) + + Expect(err).NotTo(HaveOccurred()) + Expect(len(headers)).To(Equal(2)) + Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber))) + Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber))) + }) + + It("only treats headers as checked if vat tune logs have been checked", func() { + db := test_config.NewTestDB(core.Node{}) + test_config.CleanTestDB(db) + headerRepository := repositories.NewHeaderRepository(db) + startingBlockNumber := int64(1) + vatTunedBlockNumber := int64(2) + endingBlockNumber := int64(3) + blockNumbers := []int64{startingBlockNumber, vatTunedBlockNumber, endingBlockNumber, endingBlockNumber + 1} + var headerIDs []int64 + for _, n := range blockNumbers { + headerID, err := headerRepository.CreateOrUpdateHeader(core.Header{BlockNumber: n}) + headerIDs = append(headerIDs, headerID) + Expect(err).NotTo(HaveOccurred()) + } + vatTuneRepository := vat_tune.NewVatTuneRepository(db) + _, err := db.Exec(`INSERT INTO public.checked_headers (header_id) VALUES ($1)`, headerIDs[1]) + Expect(err).NotTo(HaveOccurred()) + + headers, err := vatTuneRepository.MissingHeaders(startingBlockNumber, endingBlockNumber) + + Expect(err).NotTo(HaveOccurred()) + Expect(len(headers)).To(Equal(3)) + Expect(headers[0].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(vatTunedBlockNumber))) + Expect(headers[1].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(vatTunedBlockNumber))) + Expect(headers[2].BlockNumber).To(Or(Equal(startingBlockNumber), Equal(endingBlockNumber), Equal(vatTunedBlockNumber))) + }) + + It("only returns headers associated with the current node", func() { + db := test_config.NewTestDB(core.Node{}) + test_config.CleanTestDB(db) + blockNumbers := []int64{1, 2, 3} + headerRepository := repositories.NewHeaderRepository(db) + dbTwo := test_config.NewTestDB(core.Node{ID: "second"}) + headerRepositoryTwo := repositories.NewHeaderRepository(dbTwo) + var headerIDs []int64 + for _, n := range blockNumbers { + headerID, err := headerRepository.CreateOrUpdateHeader(core.Header{BlockNumber: n}) + Expect(err).NotTo(HaveOccurred()) + headerIDs = append(headerIDs, headerID) + _, err = headerRepositoryTwo.CreateOrUpdateHeader(core.Header{BlockNumber: n}) + Expect(err).NotTo(HaveOccurred()) + } + vatTuneRepository := vat_tune.NewVatTuneRepository(db) + vatTuneRepositoryTwo := vat_tune.NewVatTuneRepository(dbTwo) + err := vatTuneRepository.MarkHeaderChecked(headerIDs[0]) + Expect(err).NotTo(HaveOccurred()) + + nodeOneMissingHeaders, err := vatTuneRepository.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) + Expect(err).NotTo(HaveOccurred()) + Expect(len(nodeOneMissingHeaders)).To(Equal(len(blockNumbers) - 1)) + + nodeTwoMissingHeaders, err := vatTuneRepositoryTwo.MissingHeaders(blockNumbers[0], blockNumbers[len(blockNumbers)-1]) + Expect(err).NotTo(HaveOccurred()) + Expect(len(nodeTwoMissingHeaders)).To(Equal(len(blockNumbers))) + }) + }) +}) diff --git a/pkg/transformers/vat_tune/transformer.go b/pkg/transformers/vat_tune/transformer.go new file mode 100644 index 00000000..571fbe07 --- /dev/null +++ b/pkg/transformers/vat_tune/transformer.go @@ -0,0 +1,62 @@ +package vat_tune + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" + "log" +) + +type VatTuneTransformerInitializer struct { + Config shared.TransformerConfig +} + +func (initializer VatTuneTransformerInitializer) NewVatTuneTransformer(db *postgres.DB, blockChain core.BlockChain) shared.Transformer { + converter := VatTuneConverter{} + fetcher := shared.NewFetcher(blockChain) + repository := NewVatTuneRepository(db) + return VatTuneTransformer{ + Config: initializer.Config, + Converter: converter, + Fetcher: fetcher, + Repository: repository, + } +} + +type VatTuneTransformer struct { + Config shared.TransformerConfig + Converter Converter + Fetcher shared.LogFetcher + Repository Repository +} + +func (transformer VatTuneTransformer) Execute() error { + missingHeaders, err := transformer.Repository.MissingHeaders(transformer.Config.StartingBlockNumber, transformer.Config.EndingBlockNumber) + if err != nil { + return err + } + log.Printf("Fetching vat init event logs for %d headers \n", len(missingHeaders)) + for _, header := range missingHeaders { + topics := [][]common.Hash{{common.HexToHash(shared.VatTuneSignature)}} + matchingLogs, err := transformer.Fetcher.FetchLogs(VatTuneConfig.ContractAddresses, topics, header.BlockNumber) + if err != nil { + return err + } + if len(matchingLogs) < 1 { + err = transformer.Repository.MarkHeaderChecked(header.Id) + if err != nil { + return err + } + } + models, err := transformer.Converter.ToModels(matchingLogs) + if err != nil { + return err + } + err = transformer.Repository.Create(header.Id, models) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/transformers/vat_tune/transformer_test.go b/pkg/transformers/vat_tune/transformer_test.go new file mode 100644 index 00000000..8fbc1036 --- /dev/null +++ b/pkg/transformers/vat_tune/transformer_test.go @@ -0,0 +1,196 @@ +package vat_tune_test + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/vulcanizedb/pkg/core" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "github.com/vulcanize/vulcanizedb/pkg/transformers/shared" + "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data" + "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/mocks" + vat_tune_mocks "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/mocks/vat_tune" + "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_tune" +) + +var _ = Describe("Vat tune transformer", func() { + It("gets missing headers for block numbers specified in config", func() { + repository := &vat_tune_mocks.MockVatTuneRepository{} + transformer := vat_tune.VatTuneTransformer{ + Config: vat_tune.VatTuneConfig, + Fetcher: &mocks.MockLogFetcher{}, + Converter: &vat_tune_mocks.MockVatTuneConverter{}, + Repository: repository, + } + + err := transformer.Execute() + + Expect(err).NotTo(HaveOccurred()) + Expect(repository.PassedStartingBlockNumber).To(Equal(vat_tune.VatTuneConfig.StartingBlockNumber)) + Expect(repository.PassedEndingBlockNumber).To(Equal(vat_tune.VatTuneConfig.EndingBlockNumber)) + }) + + It("returns error if repository returns error for missing headers", func() { + repository := &vat_tune_mocks.MockVatTuneRepository{} + repository.SetMissingHeadersErr(fakes.FakeError) + transformer := vat_tune.VatTuneTransformer{ + Fetcher: &mocks.MockLogFetcher{}, + Converter: &vat_tune_mocks.MockVatTuneConverter{}, + Repository: repository, + } + + err := transformer.Execute() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + + It("fetches logs for missing headers", func() { + fetcher := &mocks.MockLogFetcher{} + repository := &vat_tune_mocks.MockVatTuneRepository{} + repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}, {BlockNumber: 2}}) + transformer := vat_tune.VatTuneTransformer{ + Fetcher: fetcher, + Converter: &vat_tune_mocks.MockVatTuneConverter{}, + Repository: repository, + } + + err := transformer.Execute() + + Expect(err).NotTo(HaveOccurred()) + Expect(fetcher.FetchedBlocks).To(Equal([]int64{1, 2})) + Expect(fetcher.FetchedContractAddresses).To(Equal([][]string{vat_tune.VatTuneConfig.ContractAddresses, vat_tune.VatTuneConfig.ContractAddresses})) + Expect(fetcher.FetchedTopics).To(Equal([][]common.Hash{{common.HexToHash(shared.VatTuneSignature)}})) + }) + + It("returns error if fetcher returns error", func() { + fetcher := &mocks.MockLogFetcher{} + fetcher.SetFetcherError(fakes.FakeError) + repository := &vat_tune_mocks.MockVatTuneRepository{} + repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}}) + transformer := vat_tune.VatTuneTransformer{ + Fetcher: fetcher, + Converter: &vat_tune_mocks.MockVatTuneConverter{}, + Repository: repository, + } + + err := transformer.Execute() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + + It("marks header checked if no logs returned", func() { + mockConverter := &vat_tune_mocks.MockVatTuneConverter{} + mockRepository := &vat_tune_mocks.MockVatTuneRepository{} + headerID := int64(123) + mockRepository.SetMissingHeaders([]core.Header{{Id: headerID}}) + mockFetcher := &mocks.MockLogFetcher{} + transformer := vat_tune.VatTuneTransformer{ + Converter: mockConverter, + Fetcher: mockFetcher, + Repository: mockRepository, + } + + err := transformer.Execute() + + Expect(err).NotTo(HaveOccurred()) + mockRepository.AssertMarkHeaderCheckedCalledWith(headerID) + }) + + It("returns error if marking header checked returns err", func() { + mockConverter := &vat_tune_mocks.MockVatTuneConverter{} + mockRepository := &vat_tune_mocks.MockVatTuneRepository{} + mockRepository.SetMissingHeaders([]core.Header{{Id: int64(123)}}) + mockRepository.SetMarkHeaderCheckedErr(fakes.FakeError) + mockFetcher := &mocks.MockLogFetcher{} + transformer := vat_tune.VatTuneTransformer{ + Converter: mockConverter, + Fetcher: mockFetcher, + Repository: mockRepository, + } + + err := transformer.Execute() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + + It("converts matching logs", func() { + converter := &vat_tune_mocks.MockVatTuneConverter{} + fetcher := &mocks.MockLogFetcher{} + fetcher.SetFetchedLogs([]types.Log{test_data.EthVatTuneLog}) + repository := &vat_tune_mocks.MockVatTuneRepository{} + repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}}) + transformer := vat_tune.VatTuneTransformer{ + Fetcher: fetcher, + Converter: converter, + Repository: repository, + } + + err := transformer.Execute() + + Expect(err).NotTo(HaveOccurred()) + Expect(converter.PassedLogs).To(Equal([]types.Log{test_data.EthVatTuneLog})) + }) + + It("returns error if converter returns error", func() { + converter := &vat_tune_mocks.MockVatTuneConverter{} + converter.SetConverterError(fakes.FakeError) + fetcher := &mocks.MockLogFetcher{} + fetcher.SetFetchedLogs([]types.Log{test_data.EthVatTuneLog}) + repository := &vat_tune_mocks.MockVatTuneRepository{} + repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}}) + transformer := vat_tune.VatTuneTransformer{ + Fetcher: fetcher, + Converter: converter, + Repository: repository, + } + + err := transformer.Execute() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) + + It("persists vat tune model", func() { + converter := &vat_tune_mocks.MockVatTuneConverter{} + fetcher := &mocks.MockLogFetcher{} + fetcher.SetFetchedLogs([]types.Log{test_data.EthVatTuneLog}) + repository := &vat_tune_mocks.MockVatTuneRepository{} + fakeHeader := core.Header{BlockNumber: 1, Id: 2} + repository.SetMissingHeaders([]core.Header{fakeHeader}) + transformer := vat_tune.VatTuneTransformer{ + Fetcher: fetcher, + Converter: converter, + Repository: repository, + } + + err := transformer.Execute() + + Expect(err).NotTo(HaveOccurred()) + Expect(repository.PassedHeaderID).To(Equal(fakeHeader.Id)) + Expect(repository.PassedModels).To(Equal([]vat_tune.VatTuneModel{test_data.VatTuneModel})) + }) + + It("returns error if repository returns error for create", func() { + converter := &vat_tune_mocks.MockVatTuneConverter{} + fetcher := &mocks.MockLogFetcher{} + fetcher.SetFetchedLogs([]types.Log{test_data.EthVatTuneLog}) + repository := &vat_tune_mocks.MockVatTuneRepository{} + repository.SetMissingHeaders([]core.Header{{BlockNumber: 1, Id: 2}}) + repository.SetCreateError(fakes.FakeError) + transformer := vat_tune.VatTuneTransformer{ + Fetcher: fetcher, + Converter: converter, + Repository: repository, + } + + err := transformer.Execute() + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(fakes.FakeError)) + }) +}) diff --git a/pkg/transformers/vat_tune/vat_tune_suite_test.go b/pkg/transformers/vat_tune/vat_tune_suite_test.go new file mode 100644 index 00000000..c783052b --- /dev/null +++ b/pkg/transformers/vat_tune/vat_tune_suite_test.go @@ -0,0 +1,19 @@ +package vat_tune_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "io/ioutil" + "log" +) + +func TestVatTune(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "VatTune Suite") +} + +var _ = BeforeSuite(func() { + log.SetOutput(ioutil.Discard) +}) diff --git a/test_config/test_config.go b/test_config/test_config.go index dd0b6a02..6afd95ce 100644 --- a/test_config/test_config.go +++ b/test_config/test_config.go @@ -91,6 +91,7 @@ func CleanTestDB(db *postgres.DB) { db.MustExec("DELETE FROM maker.tend") db.MustExec("DELETE FROM maker.vat_init") db.MustExec("DELETE FROM maker.vat_toll") + db.MustExec("DELETE FROM maker.vat_tune") db.MustExec("DELETE FROM receipts") db.MustExec("DELETE FROM transactions") db.MustExec("DELETE FROM watched_contracts")