diff --git a/cmd/resync.go b/cmd/resync.go index 83aad59a..d61feff9 100644 --- a/cmd/resync.go +++ b/cmd/resync.go @@ -67,6 +67,7 @@ func init() { resyncCmd.PersistentFlags().Int("resync-batch-size", 0, "data fetching batch size") resyncCmd.PersistentFlags().Int("resync-batch-number", 0, "how many goroutines to fetch data concurrently") resyncCmd.PersistentFlags().Bool("resync-clear-old-cache", false, "if true, clear out old data of the provided type within the resync range before resyncing") + resyncCmd.PersistentFlags().Bool("resync-reset-validation", false, "if true, reset times_validated to 0") resyncCmd.PersistentFlags().String("btc-http-path", "", "http url for bitcoin node") resyncCmd.PersistentFlags().String("btc-password", "", "password for btc node") @@ -88,6 +89,7 @@ func init() { viper.BindPFlag("resync.batchSize", resyncCmd.PersistentFlags().Lookup("resync-batch-size")) viper.BindPFlag("resync.batchNumber", resyncCmd.PersistentFlags().Lookup("resync-batch-number")) viper.BindPFlag("resync.clearOldCache", resyncCmd.PersistentFlags().Lookup("resync-clear-old-cache")) + viper.BindPFlag("resync.resetValidation", resyncCmd.PersistentFlags().Lookup("resync-reset-validation")) viper.BindPFlag("bitcoin.httpPath", resyncCmd.PersistentFlags().Lookup("btc-http-path")) viper.BindPFlag("bitcoin.pass", resyncCmd.PersistentFlags().Lookup("btc-password")) diff --git a/cmd/superNode.go b/cmd/superNode.go index b1239fd2..f9e8ae31 100644 --- a/cmd/superNode.go +++ b/cmd/superNode.go @@ -117,6 +117,7 @@ func init() { superNodeCmd.PersistentFlags().Int("supernode-frequency", 0, "how often (in seconds) the backfill process checks for gaps") superNodeCmd.PersistentFlags().Int("supernode-batch-size", 0, "data fetching batch size") superNodeCmd.PersistentFlags().Int("supernode-batch-number", 0, "how many goroutines to fetch data concurrently") + superNodeCmd.PersistentFlags().Int("supernode-validation-level", 0, "backfill will resync any data below this level") superNodeCmd.PersistentFlags().String("btc-ws-path", "", "ws url for bitcoin node") superNodeCmd.PersistentFlags().String("btc-http-path", "", "http url for bitcoin node") @@ -144,6 +145,7 @@ func init() { viper.BindPFlag("superNode.frequency", superNodeCmd.PersistentFlags().Lookup("supernode-frequency")) viper.BindPFlag("superNode.batchSize", superNodeCmd.PersistentFlags().Lookup("supernode-batch-size")) viper.BindPFlag("superNode.batchNumber", superNodeCmd.PersistentFlags().Lookup("supernode-batch-number")) + viper.BindPFlag("superNode.validationLevel", superNodeCmd.PersistentFlags().Lookup("supernode-validation-level")) viper.BindPFlag("bitcoin.wsPath", superNodeCmd.PersistentFlags().Lookup("btc-ws-path")) viper.BindPFlag("bitcoin.httpPath", superNodeCmd.PersistentFlags().Lookup("btc-http-path")) diff --git a/db/migrations/00032_update_receipt_cids.sql b/db/migrations/00032_update_receipt_cids.sql new file mode 100644 index 00000000..15ec931e --- /dev/null +++ b/db/migrations/00032_update_receipt_cids.sql @@ -0,0 +1,22 @@ +-- +goose Up +ALTER TABLE eth.receipt_cids +ADD COLUMN log_contracts VARCHAR(66)[]; + +ALTER TABLE eth.receipt_cids +ADD COLUMN contract_hash VARCHAR(66); + +WITH uniques AS (SELECT DISTINCT ON (tx_id) * FROM eth.receipt_cids) +DELETE FROM eth.receipt_cids WHERE receipt_cids.id NOT IN (SELECT id FROM uniques); + +ALTER TABLE eth.receipt_cids +ADD CONSTRAINT receipt_cids_tx_id_key UNIQUE (tx_id); + +-- +goose Down +ALTER TABLE eth.receipt_cids +DROP CONSTRAINT receipt_cids_tx_id_key; + +ALTER TABLE eth.receipt_cids +DROP COLUMN contract_hash; + +ALTER TABLE eth.receipt_cids +DROP COLUMN log_contracts; \ No newline at end of file diff --git a/db/migrations/00033_add_times_validated.sql b/db/migrations/00033_add_times_validated.sql new file mode 100644 index 00000000..eb8cbd27 --- /dev/null +++ b/db/migrations/00033_add_times_validated.sql @@ -0,0 +1,13 @@ +-- +goose Up +ALTER TABLE eth.header_cids +ADD COLUMN times_validated INTEGER NOT NULL DEFAULT 1; + +ALTER TABLE btc.header_cids +ADD COLUMN times_validated INTEGER NOT NULL DEFAULT 1; + +-- +goose Down +ALTER TABLE btc.header_cids +DROP COLUMN times_validated; + +ALTER TABLE eth.header_cids +DROP COLUMN times_validated; \ No newline at end of file diff --git a/db/migrations/maaaybe/00026_create_eth_headers_table.sql b/db/migrations/maaaybe/00026_create_eth_headers_table.sql deleted file mode 100644 index e69de29b..00000000 diff --git a/db/migrations/maaaybe/00027_create_eth_uncles_table.sql b/db/migrations/maaaybe/00027_create_eth_uncles_table.sql deleted file mode 100644 index e69de29b..00000000 diff --git a/db/migrations/maaaybe/00028_create_eth_transactions_table.sql b/db/migrations/maaaybe/00028_create_eth_transactions_table.sql deleted file mode 100644 index e69de29b..00000000 diff --git a/db/migrations/maaaybe/00029_create_eth_receipts_table.sql b/db/migrations/maaaybe/00029_create_eth_receipts_table.sql deleted file mode 100644 index e69de29b..00000000 diff --git a/db/migrations/maaaybe/00030_create_eth_logs_table.sql b/db/migrations/maaaybe/00030_create_eth_logs_table.sql deleted file mode 100644 index e69de29b..00000000 diff --git a/db/migrations/maaaybe/00031_create_eth_accounts_table.sql b/db/migrations/maaaybe/00031_create_eth_accounts_table.sql deleted file mode 100644 index e69de29b..00000000 diff --git a/db/migrations/maaaybe/00032_create_eth_storage_leaf_table.sql b/db/migrations/maaaybe/00032_create_eth_storage_leaf_table.sql deleted file mode 100644 index e69de29b..00000000 diff --git a/db/migrations/maaaybe/00033_create_btc_headers_table.sql b/db/migrations/maaaybe/00033_create_btc_headers_table.sql deleted file mode 100644 index e69de29b..00000000 diff --git a/db/migrations/maaaybe/00034_create_btc_transactions_table.sql.go b/db/migrations/maaaybe/00034_create_btc_transactions_table.sql.go deleted file mode 100644 index f7436b85..00000000 --- a/db/migrations/maaaybe/00034_create_btc_transactions_table.sql.go +++ /dev/null @@ -1 +0,0 @@ -package maaaybe diff --git a/db/schema.sql b/db/schema.sql index 4b287ed9..36b5ffb1 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -60,7 +60,8 @@ CREATE TABLE btc.header_cids ( cid text NOT NULL, "timestamp" numeric NOT NULL, bits bigint NOT NULL, - node_id integer NOT NULL + node_id integer NOT NULL, + times_validated integer DEFAULT 1 NOT NULL ); @@ -267,7 +268,8 @@ CREATE TABLE eth.header_cids ( receipt_root character varying(66), uncle_root character varying(66), bloom bytea, - "timestamp" numeric + "timestamp" numeric, + times_validated integer DEFAULT 1 NOT NULL ); @@ -355,7 +357,9 @@ CREATE TABLE eth.receipt_cids ( topic0s character varying(66)[], topic1s character varying(66)[], topic2s character varying(66)[], - topic3s character varying(66)[] + topic3s character varying(66)[], + log_contracts character varying(66)[], + contract_hash character varying(66) ); @@ -1258,6 +1262,14 @@ ALTER TABLE ONLY eth.receipt_cids ADD CONSTRAINT receipt_cids_pkey PRIMARY KEY (id); +-- +-- Name: receipt_cids receipt_cids_tx_id_key; Type: CONSTRAINT; Schema: eth; Owner: - +-- + +ALTER TABLE ONLY eth.receipt_cids + ADD CONSTRAINT receipt_cids_tx_id_key UNIQUE (tx_id); + + -- -- Name: state_accounts state_accounts_pkey; Type: CONSTRAINT; Schema: eth; Owner: - -- diff --git a/dockerfiles/super_node/entrypoint.sh b/dockerfiles/super_node/entrypoint.sh index 321af8e4..77fa4778 100755 --- a/dockerfiles/super_node/entrypoint.sh +++ b/dockerfiles/super_node/entrypoint.sh @@ -58,8 +58,13 @@ DEFAULT_OPTIONS="--config=$VDB_CONFIG_FILE" VDB_FULL_CL=${VDB_FULL_CL:-$VDB_COMMAND $DEFAULT_OPTIONS} echo running: ./vulcanizedb $VDB_FULL_CL $@ -# XXX need to lose the env vars -./vulcanizedb $@ +vdb_args="$@" +# default is to use the config passed by the build arg +if [[ -z "$vdb_args" ]]; + vdb_args="--config=config.toml" +fi + +./vulcanizedb $vdb_args rv=$? if [ $rv != 0 ]; then diff --git a/environments/superNodeBTC.toml b/environments/superNodeBTC.toml index 99171a9d..2f114137 100644 --- a/environments/superNodeBTC.toml +++ b/environments/superNodeBTC.toml @@ -16,6 +16,7 @@ batchSize = 1 # $RESYNC_BATCH_SIZE batchNumber = 50 # $RESYNC_BATCH_NUMBER clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE + resetValidation = true # $RESYNC_RESET_VALIDATION [superNode] chain = "bitcoin" # $SUPERNODE_CHAIN @@ -29,6 +30,7 @@ frequency = 45 # $SUPERNODE_FREQUENCY batchSize = 1 # $SUPERNODE_BATCH_SIZE batchNumber = 50 # $SUPERNODE_BATCH_NUMBER + validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL [bitcoin] wsPath = "127.0.0.1:8332" # $BTC_WS_PATH diff --git a/environments/superNodeETH.toml b/environments/superNodeETH.toml index 69e3b32d..4f1f3730 100644 --- a/environments/superNodeETH.toml +++ b/environments/superNodeETH.toml @@ -13,9 +13,10 @@ type = "state" # $RESYNC_TYPE start = 0 # $RESYNC_START stop = 0 # $RESYNC_STOP - batchSize = 5 # $RESYNC_BATCH_SIZE - batchNumber = 50 # $RESYNC_BATCH_NUMBER - clearOldCache = true # $RESYNC_CLEAR_OLD_CACHE + batchSize = 10 # $RESYNC_BATCH_SIZE + batchNumber = 100 # $RESYNC_BATCH_NUMBER + clearOldCache = false # $RESYNC_CLEAR_OLD_CACHE + resetValidation = true # $RESYNC_RESET_VALIDATION [superNode] chain = "ethereum" # $SUPERNODE_CHAIN @@ -29,6 +30,7 @@ frequency = 15 # $SUPERNODE_FREQUENCY batchSize = 5 # $SUPERNODE_BATCH_SIZE batchNumber = 50 # $SUPERNODE_BATCH_NUMBER + validationLevel = 1 # $SUPERNODE_VALIDATION_LEVEL [ethereum] wsPath = "127.0.0.1:8546" # $ETH_WS_PATH diff --git a/libraries/shared/storage/utils/bins_test.go b/libraries/shared/storage/utils/bins_test.go index 84f9405e..19985b3b 100644 --- a/libraries/shared/storage/utils/bins_test.go +++ b/libraries/shared/storage/utils/bins_test.go @@ -44,6 +44,14 @@ var _ = Describe("GetBlockHeightBins", func() { Expect(err).ToNot(HaveOccurred()) Expect(len(blockRangeBins)).To(Equal(100)) Expect(blockRangeBins[99]).To(Equal(lastBin)) + + startingBlock = 1 + endingBlock = 1 + batchSize = 100 + blockRangeBins, err = utils.GetBlockHeightBins(startingBlock, endingBlock, batchSize) + Expect(err).ToNot(HaveOccurred()) + Expect(len(blockRangeBins)).To(Equal(1)) + Expect(blockRangeBins[0]).To(Equal([]uint64{1})) }) It("throws an error if the starting block is higher than the ending block", func() { diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 247a30f4..29059253 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -63,6 +63,8 @@ type BackFillService struct { QuitChan chan bool // Chain type chain shared.ChainType + // Headers with times_validated lower than this will be resynced + validationLevel int } // NewBackFillService returns a new BackFillInterface @@ -107,6 +109,7 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert ScreenAndServeChan: screenAndServeChan, QuitChan: settings.Quit, chain: settings.Chain, + validationLevel: settings.ValidationLevel, }, nil } @@ -135,7 +138,7 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { log.Error(err) } } - gaps, err := bfs.Retriever.RetrieveGapsInData() + gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel) if err != nil { log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.chain.String(), err) continue @@ -158,7 +161,6 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { if endingBlock < startingBlock { return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.chain.String()) } - // // break the range up into bins of smaller ranges blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bfs.BatchSize) if err != nil { diff --git a/pkg/super_node/btc/cleaner.go b/pkg/super_node/btc/cleaner.go index cf39cd5b..95a00b27 100644 --- a/pkg/super_node/btc/cleaner.go +++ b/pkg/super_node/btc/cleaner.go @@ -38,6 +38,24 @@ func NewCleaner(db *postgres.DB) *Cleaner { } } +// ResetValidation resets the validation level to 0 to enable revalidation +func (c *Cleaner) ResetValidation(rngs [][2]uint64) error { + tx, err := c.db.Beginx() + if err != nil { + return err + } + for _, rng := range rngs { + logrus.Infof("btc db cleaner resetting validation level to 0 for block range %d to %d", rng[0], rng[1]) + pgStr := `UPDATE btc.header_cids + SET times_validated = 0 + WHERE block_number BETWEEN $1 AND $2` + if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil { + return err + } + } + return tx.Commit() +} + // Clean removes the specified data from the db within the provided block range func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error { tx, err := c.db.Beginx() diff --git a/pkg/super_node/btc/cleaner_test.go b/pkg/super_node/btc/cleaner_test.go index a245a294..86efd607 100644 --- a/pkg/super_node/btc/cleaner_test.go +++ b/pkg/super_node/btc/cleaner_test.go @@ -285,4 +285,58 @@ var _ = Describe("Cleaner", func() { Expect(headerCount).To(Equal(2)) }) }) + + Describe("ResetValidation", func() { + BeforeEach(func() { + err := repo.Index(mockCIDPayload1) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(mockCIDPayload2) + Expect(err).ToNot(HaveOccurred()) + + var validationTimes []int + pgStr := `SELECT times_validated FROM btc.header_cids` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(1)) + Expect(validationTimes[1]).To(Equal(1)) + + err = repo.Index(mockCIDPayload1) + Expect(err).ToNot(HaveOccurred()) + + validationTimes = []int{} + pgStr = `SELECT times_validated FROM btc.header_cids ORDER BY block_number` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(2)) + Expect(validationTimes[1]).To(Equal(1)) + }) + AfterEach(func() { + btc.TearDownDB(db) + }) + It("Resets the validation level", func() { + err := cleaner.ResetValidation(rngs) + Expect(err).ToNot(HaveOccurred()) + + var validationTimes []int + pgStr := `SELECT times_validated FROM btc.header_cids` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(0)) + Expect(validationTimes[1]).To(Equal(0)) + + err = repo.Index(mockCIDPayload2) + Expect(err).ToNot(HaveOccurred()) + + validationTimes = []int{} + pgStr = `SELECT times_validated FROM btc.header_cids ORDER BY block_number` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(0)) + Expect(validationTimes[1]).To(Equal(1)) + }) + }) }) diff --git a/pkg/super_node/btc/indexer.go b/pkg/super_node/btc/indexer.go index b042ce74..a6dda9cf 100644 --- a/pkg/super_node/btc/indexer.go +++ b/pkg/super_node/btc/indexer.go @@ -61,11 +61,11 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error { func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel, nodeID int64) (int64, error) { var headerID int64 - err := tx.QueryRowx(`INSERT INTO btc.header_cids (block_number, block_hash, parent_hash, cid, timestamp, bits, node_id) - VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, timestamp, bits, node_id) = ($3, $4, $5, $6, $7) + err := tx.QueryRowx(`INSERT INTO btc.header_cids (block_number, block_hash, parent_hash, cid, timestamp, bits, node_id, times_validated) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, timestamp, bits, node_id, times_validated) = ($3, $4, $5, $6, $7, btc.header_cids.times_validated + 1) RETURNING id`, - header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.Timestamp, header.Bits, nodeID).Scan(&headerID) + header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.Timestamp, header.Bits, nodeID, 1).Scan(&headerID) return headerID, err } diff --git a/pkg/super_node/btc/models.go b/pkg/super_node/btc/models.go index 66a1d221..1317a1bc 100644 --- a/pkg/super_node/btc/models.go +++ b/pkg/super_node/btc/models.go @@ -20,14 +20,15 @@ import "github.com/lib/pq" // HeaderModel is the db model for btc.header_cids table type HeaderModel struct { - ID int64 `db:"id"` - BlockNumber string `db:"block_number"` - BlockHash string `db:"block_hash"` - ParentHash string `db:"parent_hash"` - CID string `db:"cid"` - Timestamp int64 `db:"timestamp"` - Bits uint32 `db:"bits"` - NodeID int64 `db:"node_id"` + ID int64 `db:"id"` + BlockNumber string `db:"block_number"` + BlockHash string `db:"block_hash"` + ParentHash string `db:"parent_hash"` + CID string `db:"cid"` + Timestamp int64 `db:"timestamp"` + Bits uint32 `db:"bits"` + NodeID int64 `db:"node_id"` + TimesValidated int64 `db:"times_validated"` } // TxModel is the db model for btc.transaction_cids table diff --git a/pkg/super_node/btc/retriever.go b/pkg/super_node/btc/retriever.go index 864812bd..4a08cbfd 100644 --- a/pkg/super_node/btc/retriever.go +++ b/pkg/super_node/btc/retriever.go @@ -161,7 +161,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID } // RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db -func (ecr *CIDRetriever) RetrieveGapsInData() ([]shared.Gap, error) { +func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) { pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM btc.header_cids LEFT JOIN btc.header_cids r on btc.header_cids.block_number = r.block_number - 1 LEFT JOIN btc.header_cids fr on btc.header_cids.block_number < fr.block_number @@ -171,18 +171,45 @@ func (ecr *CIDRetriever) RetrieveGapsInData() ([]shared.Gap, error) { Start uint64 `db:"start"` Stop uint64 `db:"stop"` }, 0) - err := ecr.db.Select(&results, pgStr) - if err != nil { + if err := ecr.db.Select(&results, pgStr); err != nil { return nil, err } - gaps := make([]shared.Gap, len(results)) + emptyGaps := make([]shared.Gap, len(results)) for i, res := range results { - gaps[i] = shared.Gap{ + emptyGaps[i] = shared.Gap{ Start: res.Start, Stop: res.Stop, } } - return gaps, nil + + // Find sections of blocks where we are below the validation level + // There will be no overlap between these "gaps" and the ones above + pgStr = `SELECT block_number FROM btc.header_cids + WHERE times_validated < $1 + ORDER BY block_number` + var heights []uint64 + if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil { + return nil, err + } + if len(heights) == 0 { + return emptyGaps, nil + } + validationGaps := make([]shared.Gap, 0) + start := heights[0] + lastHeight := start + for _, height := range heights[1:] { + if height == lastHeight+1 { + lastHeight = height + continue + } + validationGaps = append(validationGaps, shared.Gap{ + Start: start, + Stop: lastHeight, + }) + start = height + lastHeight = start + } + return append(emptyGaps, validationGaps...), nil } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash diff --git a/pkg/super_node/config.go b/pkg/super_node/config.go index e8afe345..5614c1e2 100644 --- a/pkg/super_node/config.go +++ b/pkg/super_node/config.go @@ -33,17 +33,18 @@ import ( // Env variables const ( - SUPERNODE_CHAIN = "SUPERNODE_CHAIN" - SUPERNODE_SYNC = "SUPERNODE_SYNC" - SUPERNODE_WORKERS = "SUPERNODE_WORKERS" - SUPERNODE_SERVER = "SUPERNODE_SERVER" - SUPERNODE_WS_PATH = "SUPERNODE_WS_PATH" - SUPERNODE_IPC_PATH = "SUPERNODE_IPC_PATH" - SUPERNODE_HTTP_PATH = "SUPERNODE_HTTP_PATH" - SUPERNODE_BACKFILL = "SUPERNODE_BACKFILL" - SUPERNODE_FREQUENCY = "SUPERNODE_FREQUENCY" - SUPERNODE_BATCH_SIZE = "SUPERNODE_BATCH_SIZE" - SUPERNODE_BATCH_NUMBER = "SUPERNODE_BATCH_NUMBER" + SUPERNODE_CHAIN = "SUPERNODE_CHAIN" + SUPERNODE_SYNC = "SUPERNODE_SYNC" + SUPERNODE_WORKERS = "SUPERNODE_WORKERS" + SUPERNODE_SERVER = "SUPERNODE_SERVER" + SUPERNODE_WS_PATH = "SUPERNODE_WS_PATH" + SUPERNODE_IPC_PATH = "SUPERNODE_IPC_PATH" + SUPERNODE_HTTP_PATH = "SUPERNODE_HTTP_PATH" + SUPERNODE_BACKFILL = "SUPERNODE_BACKFILL" + SUPERNODE_FREQUENCY = "SUPERNODE_FREQUENCY" + SUPERNODE_BATCH_SIZE = "SUPERNODE_BATCH_SIZE" + SUPERNODE_BATCH_NUMBER = "SUPERNODE_BATCH_NUMBER" + SUPERNODE_VALIDATION_LEVEL = "SUPERNODE_VALIDATION_LEVEL" ) // Config struct @@ -65,11 +66,12 @@ type Config struct { WSClient interface{} NodeInfo core.Node // Backfiller params - BackFill bool - HTTPClient interface{} - Frequency time.Duration - BatchSize uint64 - BatchNumber uint64 + BackFill bool + HTTPClient interface{} + Frequency time.Duration + BatchSize uint64 + BatchNumber uint64 + ValidationLevel int } // NewSuperNodeConfig is used to initialize a SuperNode config from a .toml file @@ -167,6 +169,7 @@ func (c *Config) BackFillFields() error { viper.BindEnv("superNode.frequency", SUPERNODE_FREQUENCY) viper.BindEnv("superNode.batchSize", SUPERNODE_BATCH_SIZE) viper.BindEnv("superNode.batchNumber", SUPERNODE_BATCH_NUMBER) + viper.BindEnv("superNode.validationLevel", SUPERNODE_VALIDATION_LEVEL) switch c.Chain { case shared.Ethereum: @@ -190,5 +193,6 @@ func (c *Config) BackFillFields() error { c.Frequency = frequency c.BatchSize = uint64(viper.GetInt64("superNode.batchSize")) c.BatchNumber = uint64(viper.GetInt64("superNode.batchNumber")) + c.ValidationLevel = viper.GetInt("superNode.validationLevel") return nil } diff --git a/pkg/super_node/eth/api.go b/pkg/super_node/eth/api.go index cd80453c..f882bbb3 100644 --- a/pkg/super_node/eth/api.go +++ b/pkg/super_node/eth/api.go @@ -20,13 +20,10 @@ import ( "context" "math/big" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" ) @@ -73,8 +70,8 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) } } filter := ReceiptFilter{ - Contracts: addrStrs, - Topics: topicStrSets, + LogAddresses: addrStrs, + Topics: topicStrSets, } tx, err := pea.b.DB.Beginx() if err != nil { @@ -181,206 +178,3 @@ func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.H // Transaction unknown, return as such return nil, nil } - -// extractLogsOfInterest returns logs from the receipt IPLD -func extractLogsOfInterest(rctIPLDs []ipfs.BlockModel, wantedTopics [][]string) ([]*types.Log, error) { - var logs []*types.Log - for _, rctIPLD := range rctIPLDs { - rctRLP := rctIPLD - var rct types.Receipt - if err := rlp.DecodeBytes(rctRLP.Data, &rct); err != nil { - return nil, err - } - for _, log := range rct.Logs { - if wanted := wantedLog(wantedTopics, log.Topics); wanted == true { - logs = append(logs, log) - } - } - } - return logs, nil -} - -// returns true if the log matches on the filter -func wantedLog(wantedTopics [][]string, actualTopics []common.Hash) bool { - // actualTopics will always have length <= 4 - // wantedTopics will always have length 4 - matches := 0 - for i, actualTopic := range actualTopics { - // If we have topics in this filter slot, count as a match if the actualTopic matches one of the ones in this filter slot - if len(wantedTopics[i]) > 0 { - matches += sliceContainsHash(wantedTopics[i], actualTopic) - } else { - // Filter slot is empty, not matching any topics at this slot => counts as a match - matches++ - } - } - if matches == len(actualTopics) { - return true - } - return false -} - -// returns 1 if the slice contains the hash, 0 if it does not -func sliceContainsHash(slice []string, hash common.Hash) int { - for _, str := range slice { - if str == hash.String() { - return 1 - } - } - return 0 -} - -// rpcMarshalHeader uses the generalized output filler, then adds the total difficulty field, which requires -// a `PublicEthAPI`. -func (pea *PublicEthAPI) rpcMarshalHeader(header *types.Header) (map[string]interface{}, error) { - fields := RPCMarshalHeader(header) - td, err := pea.b.GetTd(header.Hash()) - if err != nil { - return nil, err - } - fields["totalDifficulty"] = (*hexutil.Big)(td) - return fields, nil -} - -// RPCMarshalHeader converts the given header to the RPC output. -// This function is eth/internal so we have to make our own version here... -func RPCMarshalHeader(head *types.Header) map[string]interface{} { - return map[string]interface{}{ - "number": (*hexutil.Big)(head.Number), - "hash": head.Hash(), - "parentHash": head.ParentHash, - "nonce": head.Nonce, - "mixHash": head.MixDigest, - "sha3Uncles": head.UncleHash, - "logsBloom": head.Bloom, - "stateRoot": head.Root, - "miner": head.Coinbase, - "difficulty": (*hexutil.Big)(head.Difficulty), - "extraData": hexutil.Bytes(head.Extra), - "size": hexutil.Uint64(head.Size()), - "gasLimit": hexutil.Uint64(head.GasLimit), - "gasUsed": hexutil.Uint64(head.GasUsed), - "timestamp": hexutil.Uint64(head.Time), - "transactionsRoot": head.TxHash, - "receiptsRoot": head.ReceiptHash, - } -} - -// rpcMarshalBlock uses the generalized output filler, then adds the total difficulty field, which requires -// a `PublicBlockchainAPI`. -func (pea *PublicEthAPI) rpcMarshalBlock(b *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) { - fields, err := RPCMarshalBlock(b, inclTx, fullTx) - if err != nil { - return nil, err - } - td, err := pea.b.GetTd(b.Hash()) - if err != nil { - return nil, err - } - fields["totalDifficulty"] = (*hexutil.Big)(td) - return fields, err -} - -// RPCMarshalBlock converts the given block to the RPC output which depends on fullTx. If inclTx is true transactions are -// returned. When fullTx is true the returned block contains full transaction details, otherwise it will only contain -// transaction hashes. -func RPCMarshalBlock(block *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) { - fields := RPCMarshalHeader(block.Header()) - fields["size"] = hexutil.Uint64(block.Size()) - - if inclTx { - formatTx := func(tx *types.Transaction) (interface{}, error) { - return tx.Hash(), nil - } - if fullTx { - formatTx = func(tx *types.Transaction) (interface{}, error) { - return NewRPCTransactionFromBlockHash(block, tx.Hash()), nil - } - } - txs := block.Transactions() - transactions := make([]interface{}, len(txs)) - var err error - for i, tx := range txs { - if transactions[i], err = formatTx(tx); err != nil { - return nil, err - } - } - fields["transactions"] = transactions - } - uncles := block.Uncles() - uncleHashes := make([]common.Hash, len(uncles)) - for i, uncle := range uncles { - uncleHashes[i] = uncle.Hash() - } - fields["uncles"] = uncleHashes - - return fields, nil -} - -// NewRPCTransactionFromBlockHash returns a transaction that will serialize to the RPC representation. -func NewRPCTransactionFromBlockHash(b *types.Block, hash common.Hash) *RPCTransaction { - for idx, tx := range b.Transactions() { - if tx.Hash() == hash { - return newRPCTransactionFromBlockIndex(b, uint64(idx)) - } - } - return nil -} - -// newRPCTransactionFromBlockIndex returns a transaction that will serialize to the RPC representation. -func newRPCTransactionFromBlockIndex(b *types.Block, index uint64) *RPCTransaction { - txs := b.Transactions() - if index >= uint64(len(txs)) { - return nil - } - return NewRPCTransaction(txs[index], b.Hash(), b.NumberU64(), index) -} - -// RPCTransaction represents a transaction that will serialize to the RPC representation of a transaction -type RPCTransaction struct { - BlockHash *common.Hash `json:"blockHash"` - BlockNumber *hexutil.Big `json:"blockNumber"` - From common.Address `json:"from"` - Gas hexutil.Uint64 `json:"gas"` - GasPrice *hexutil.Big `json:"gasPrice"` - Hash common.Hash `json:"hash"` - Input hexutil.Bytes `json:"input"` - Nonce hexutil.Uint64 `json:"nonce"` - To *common.Address `json:"to"` - TransactionIndex *hexutil.Uint64 `json:"transactionIndex"` - Value *hexutil.Big `json:"value"` - V *hexutil.Big `json:"v"` - R *hexutil.Big `json:"r"` - S *hexutil.Big `json:"s"` -} - -// NewRPCTransaction returns a transaction that will serialize to the RPC -// representation, with the given location metadata set (if available). -func NewRPCTransaction(tx *types.Transaction, blockHash common.Hash, blockNumber uint64, index uint64) *RPCTransaction { - var signer types.Signer = types.FrontierSigner{} - if tx.Protected() { - signer = types.NewEIP155Signer(tx.ChainId()) - } - from, _ := types.Sender(signer, tx) - v, r, s := tx.RawSignatureValues() - - result := &RPCTransaction{ - From: from, - Gas: hexutil.Uint64(tx.Gas()), - GasPrice: (*hexutil.Big)(tx.GasPrice()), - Hash: tx.Hash(), - Input: hexutil.Bytes(tx.Data()), // somehow this is ending up `nil` - Nonce: hexutil.Uint64(tx.Nonce()), - To: tx.To(), - Value: (*hexutil.Big)(tx.Value()), - V: (*hexutil.Big)(v), - R: (*hexutil.Big)(r), - S: (*hexutil.Big)(s), - } - if blockHash != (common.Hash{}) { - result.BlockHash = &blockHash - result.BlockNumber = (*hexutil.Big)(new(big.Int).SetUint64(blockNumber)) - result.TransactionIndex = (*hexutil.Uint64)(&index) - } - return result -} diff --git a/pkg/super_node/eth/api_test.go b/pkg/super_node/eth/api_test.go index 61915b62..60106aee 100644 --- a/pkg/super_node/eth/api_test.go +++ b/pkg/super_node/eth/api_test.go @@ -101,8 +101,10 @@ var _ = Describe("API", func() { mocks.HeaderCID: mocks.HeaderIPLD, mocks.Trx1CID: mocks.Trx1IPLD, mocks.Trx2CID: mocks.Trx2IPLD, + mocks.Trx3CID: mocks.Trx3IPLD, mocks.Rct1CID: mocks.Rct1IPLD, mocks.Rct2CID: mocks.Rct2IPLD, + mocks.Rct3CID: mocks.Rct3IPLD, mocks.State1CID: mocks.State1IPLD, mocks.State2CID: mocks.State2IPLD, mocks.StorageCID: mocks.StorageIPLD, diff --git a/pkg/super_node/eth/backend.go b/pkg/super_node/eth/backend.go index 0b23a0d5..1365c47c 100644 --- a/pkg/super_node/eth/backend.go +++ b/pkg/super_node/eth/backend.go @@ -23,11 +23,13 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/postgres" ) @@ -309,3 +311,206 @@ func (b *Backend) GetTransaction(ctx context.Context, txHash common.Hash) (*type } return &transaction, common.HexToHash(txCIDWithHeaderInfo.BlockHash), uint64(txCIDWithHeaderInfo.BlockNumber), uint64(txCIDWithHeaderInfo.Index), nil } + +// extractLogsOfInterest returns logs from the receipt IPLD +func extractLogsOfInterest(rctIPLDs []ipfs.BlockModel, wantedTopics [][]string) ([]*types.Log, error) { + var logs []*types.Log + for _, rctIPLD := range rctIPLDs { + rctRLP := rctIPLD + var rct types.Receipt + if err := rlp.DecodeBytes(rctRLP.Data, &rct); err != nil { + return nil, err + } + for _, log := range rct.Logs { + if wanted := wantedLog(wantedTopics, log.Topics); wanted == true { + logs = append(logs, log) + } + } + } + return logs, nil +} + +// returns true if the log matches on the filter +func wantedLog(wantedTopics [][]string, actualTopics []common.Hash) bool { + // actualTopics will always have length <= 4 + // wantedTopics will always have length 4 + matches := 0 + for i, actualTopic := range actualTopics { + // If we have topics in this filter slot, count as a match if the actualTopic matches one of the ones in this filter slot + if len(wantedTopics[i]) > 0 { + matches += sliceContainsHash(wantedTopics[i], actualTopic) + } else { + // Filter slot is empty, not matching any topics at this slot => counts as a match + matches++ + } + } + if matches == len(actualTopics) { + return true + } + return false +} + +// returns 1 if the slice contains the hash, 0 if it does not +func sliceContainsHash(slice []string, hash common.Hash) int { + for _, str := range slice { + if str == hash.String() { + return 1 + } + } + return 0 +} + +// rpcMarshalHeader uses the generalized output filler, then adds the total difficulty field, which requires +// a `PublicEthAPI`. +func (pea *PublicEthAPI) rpcMarshalHeader(header *types.Header) (map[string]interface{}, error) { + fields := RPCMarshalHeader(header) + td, err := pea.b.GetTd(header.Hash()) + if err != nil { + return nil, err + } + fields["totalDifficulty"] = (*hexutil.Big)(td) + return fields, nil +} + +// RPCMarshalHeader converts the given header to the RPC output. +// This function is eth/internal so we have to make our own version here... +func RPCMarshalHeader(head *types.Header) map[string]interface{} { + return map[string]interface{}{ + "number": (*hexutil.Big)(head.Number), + "hash": head.Hash(), + "parentHash": head.ParentHash, + "nonce": head.Nonce, + "mixHash": head.MixDigest, + "sha3Uncles": head.UncleHash, + "logsBloom": head.Bloom, + "stateRoot": head.Root, + "miner": head.Coinbase, + "difficulty": (*hexutil.Big)(head.Difficulty), + "extraData": hexutil.Bytes(head.Extra), + "size": hexutil.Uint64(head.Size()), + "gasLimit": hexutil.Uint64(head.GasLimit), + "gasUsed": hexutil.Uint64(head.GasUsed), + "timestamp": hexutil.Uint64(head.Time), + "transactionsRoot": head.TxHash, + "receiptsRoot": head.ReceiptHash, + } +} + +// rpcMarshalBlock uses the generalized output filler, then adds the total difficulty field, which requires +// a `PublicBlockchainAPI`. +func (pea *PublicEthAPI) rpcMarshalBlock(b *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) { + fields, err := RPCMarshalBlock(b, inclTx, fullTx) + if err != nil { + return nil, err + } + td, err := pea.b.GetTd(b.Hash()) + if err != nil { + return nil, err + } + fields["totalDifficulty"] = (*hexutil.Big)(td) + return fields, err +} + +// RPCMarshalBlock converts the given block to the RPC output which depends on fullTx. If inclTx is true transactions are +// returned. When fullTx is true the returned block contains full transaction details, otherwise it will only contain +// transaction hashes. +func RPCMarshalBlock(block *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) { + fields := RPCMarshalHeader(block.Header()) + fields["size"] = hexutil.Uint64(block.Size()) + + if inclTx { + formatTx := func(tx *types.Transaction) (interface{}, error) { + return tx.Hash(), nil + } + if fullTx { + formatTx = func(tx *types.Transaction) (interface{}, error) { + return NewRPCTransactionFromBlockHash(block, tx.Hash()), nil + } + } + txs := block.Transactions() + transactions := make([]interface{}, len(txs)) + var err error + for i, tx := range txs { + if transactions[i], err = formatTx(tx); err != nil { + return nil, err + } + } + fields["transactions"] = transactions + } + uncles := block.Uncles() + uncleHashes := make([]common.Hash, len(uncles)) + for i, uncle := range uncles { + uncleHashes[i] = uncle.Hash() + } + fields["uncles"] = uncleHashes + + return fields, nil +} + +// NewRPCTransactionFromBlockHash returns a transaction that will serialize to the RPC representation. +func NewRPCTransactionFromBlockHash(b *types.Block, hash common.Hash) *RPCTransaction { + for idx, tx := range b.Transactions() { + if tx.Hash() == hash { + return newRPCTransactionFromBlockIndex(b, uint64(idx)) + } + } + return nil +} + +// newRPCTransactionFromBlockIndex returns a transaction that will serialize to the RPC representation. +func newRPCTransactionFromBlockIndex(b *types.Block, index uint64) *RPCTransaction { + txs := b.Transactions() + if index >= uint64(len(txs)) { + return nil + } + return NewRPCTransaction(txs[index], b.Hash(), b.NumberU64(), index) +} + +// RPCTransaction represents a transaction that will serialize to the RPC representation of a transaction +type RPCTransaction struct { + BlockHash *common.Hash `json:"blockHash"` + BlockNumber *hexutil.Big `json:"blockNumber"` + From common.Address `json:"from"` + Gas hexutil.Uint64 `json:"gas"` + GasPrice *hexutil.Big `json:"gasPrice"` + Hash common.Hash `json:"hash"` + Input hexutil.Bytes `json:"input"` + Nonce hexutil.Uint64 `json:"nonce"` + To *common.Address `json:"to"` + TransactionIndex *hexutil.Uint64 `json:"transactionIndex"` + Value *hexutil.Big `json:"value"` + V *hexutil.Big `json:"v"` + R *hexutil.Big `json:"r"` + S *hexutil.Big `json:"s"` +} + +// NewRPCTransaction returns a transaction that will serialize to the RPC +// representation, with the given location metadata set (if available). +func NewRPCTransaction(tx *types.Transaction, blockHash common.Hash, blockNumber uint64, index uint64) *RPCTransaction { + var signer types.Signer = types.FrontierSigner{} + if tx.Protected() { + signer = types.NewEIP155Signer(tx.ChainId()) + } + from, _ := types.Sender(signer, tx) + v, r, s := tx.RawSignatureValues() + + result := &RPCTransaction{ + From: from, + Gas: hexutil.Uint64(tx.Gas()), + GasPrice: (*hexutil.Big)(tx.GasPrice()), + Hash: tx.Hash(), + Input: hexutil.Bytes(tx.Data()), // somehow this is ending up `nil` + Nonce: hexutil.Uint64(tx.Nonce()), + To: tx.To(), + Value: (*hexutil.Big)(tx.Value()), + V: (*hexutil.Big)(v), + R: (*hexutil.Big)(r), + S: (*hexutil.Big)(s), + } + if blockHash != (common.Hash{}) { + result.BlockHash = &blockHash + result.BlockNumber = (*hexutil.Big)(new(big.Int).SetUint64(blockNumber)) + result.TransactionIndex = (*hexutil.Uint64)(&index) + } + return result +} diff --git a/pkg/super_node/eth/cleaner.go b/pkg/super_node/eth/cleaner.go index c3496504..c07dd063 100644 --- a/pkg/super_node/eth/cleaner.go +++ b/pkg/super_node/eth/cleaner.go @@ -38,6 +38,24 @@ func NewCleaner(db *postgres.DB) *Cleaner { } } +// ResetValidation resets the validation level to 0 to enable revalidation +func (c *Cleaner) ResetValidation(rngs [][2]uint64) error { + tx, err := c.db.Beginx() + if err != nil { + return err + } + for _, rng := range rngs { + logrus.Infof("eth db cleaner resetting validation level to 0 for block range %d to %d", rng[0], rng[1]) + pgStr := `UPDATE eth.header_cids + SET times_validated = 0 + WHERE block_number BETWEEN $1 AND $2` + if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil { + return err + } + } + return tx.Commit() +} + // Clean removes the specified data from the db within the provided block range func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error { tx, err := c.db.Beginx() diff --git a/pkg/super_node/eth/cleaner_test.go b/pkg/super_node/eth/cleaner_test.go index 351ce77d..3ced56ac 100644 --- a/pkg/super_node/eth/cleaner_test.go +++ b/pkg/super_node/eth/cleaner_test.go @@ -89,12 +89,12 @@ var ( rct2Contract = common.HexToAddress("0x010c") receiptModels1 = map[common.Hash]eth2.ReceiptModel{ tx1Hash: { - CID: rct1CID, - Contract: rct1Contract.String(), + CID: rct1CID, + ContractHash: crypto.Keccak256Hash(rct1Contract.Bytes()).String(), }, tx2Hash: { - CID: rct2CID, - Contract: rct2Contract.String(), + CID: rct2CID, + ContractHash: crypto.Keccak256Hash(rct2Contract.Bytes()).String(), }, } @@ -170,8 +170,8 @@ var ( rct3CID = "mockRct3CID" receiptModels2 = map[common.Hash]eth2.ReceiptModel{ tx3Hash: { - CID: rct3CID, - Contract: rct1Contract.String(), + CID: rct3CID, + ContractHash: crypto.Keccak256Hash(rct1Contract.Bytes()).String(), }, } @@ -611,4 +611,58 @@ var _ = Describe("Cleaner", func() { Expect(blocksCount).To(Equal(12)) }) }) + + Describe("ResetValidation", func() { + BeforeEach(func() { + err := repo.Index(mockCIDPayload1) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(mockCIDPayload2) + Expect(err).ToNot(HaveOccurred()) + + var validationTimes []int + pgStr := `SELECT times_validated FROM eth.header_cids` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(1)) + Expect(validationTimes[1]).To(Equal(1)) + + err = repo.Index(mockCIDPayload1) + Expect(err).ToNot(HaveOccurred()) + + validationTimes = []int{} + pgStr = `SELECT times_validated FROM eth.header_cids ORDER BY block_number` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(2)) + Expect(validationTimes[1]).To(Equal(1)) + }) + AfterEach(func() { + eth.TearDownDB(db) + }) + It("Resets the validation level", func() { + err := cleaner.ResetValidation(rngs) + Expect(err).ToNot(HaveOccurred()) + + var validationTimes []int + pgStr := `SELECT times_validated FROM eth.header_cids` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(0)) + Expect(validationTimes[1]).To(Equal(0)) + + err = repo.Index(mockCIDPayload2) + Expect(err).ToNot(HaveOccurred()) + + validationTimes = []int{} + pgStr = `SELECT times_validated FROM eth.header_cids ORDER BY block_number` + err = db.Select(&validationTimes, pgStr) + Expect(err).ToNot(HaveOccurred()) + Expect(len(validationTimes)).To(Equal(2)) + Expect(validationTimes[0]).To(Equal(0)) + Expect(validationTimes[1]).To(Equal(1)) + }) + }) }) diff --git a/pkg/super_node/eth/converter.go b/pkg/super_node/eth/converter.go index 61a7785b..d6287293 100644 --- a/pkg/super_node/eth/converter.go +++ b/pkg/super_node/eth/converter.go @@ -72,8 +72,8 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert return nil, err } txMeta := TxModel{ - Dst: shared.HandleNullAddr(trx.To()), - Src: shared.HandleNullAddr(&from), + Dst: shared.HandleNullAddrPointer(trx.To()), + Src: shared.HandleNullAddr(from), TxHash: trx.Hash().String(), Index: int64(i), } @@ -90,28 +90,33 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert if err := receipts.DeriveFields(pc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { return nil, err } - for i, receipt := range receipts { - // If the transaction for this receipt has a "to" address, the above DeriveFields() fails to assign it to the receipt's ContractAddress - // If it doesn't have a "to" address, it correctly derives it and assigns it to to the receipt's ContractAddress - // Weird, right? - if transactions[i].To() != nil { - receipt.ContractAddress = *transactions[i].To() - } + for _, receipt := range receipts { // Extract topic and contract data from the receipt for indexing topicSets := make([][]string, 4) + mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses for _, log := range receipt.Logs { - for i := range topicSets { - if i < len(log.Topics) { - topicSets[i] = append(topicSets[i], log.Topics[i].Hex()) - } + for i, topic := range log.Topics { + topicSets[i] = append(topicSets[i], topic.Hex()) } + mappedContracts[log.Address.String()] = true + } + logContracts := make([]string, 0, len(mappedContracts)) + for addr := range mappedContracts { + logContracts = append(logContracts, addr) + } + contract := shared.HandleNullAddr(receipt.ContractAddress) + var contractHash string + if contract != "" { + contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() } rctMeta := ReceiptModel{ - Topic0s: topicSets[0], - Topic1s: topicSets[1], - Topic2s: topicSets[2], - Topic3s: topicSets[3], - Contract: receipt.ContractAddress.Hex(), + Topic0s: topicSets[0], + Topic1s: topicSets[1], + Topic2s: topicSets[2], + Topic3s: topicSets[3], + Contract: contract, + ContractHash: contractHash, + LogContracts: logContracts, } // receipt and rctMeta will have same indexes convertedPayload.Receipts = append(convertedPayload.Receipts, receipt) diff --git a/pkg/super_node/eth/filterer.go b/pkg/super_node/eth/filterer.go index eb941ae7..6ae7d691 100644 --- a/pkg/super_node/eth/filterer.go +++ b/pkg/super_node/eth/filterer.go @@ -173,7 +173,7 @@ func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response * for i, receipt := range payload.Receipts { // topics is always length 4 topics := [][]string{payload.ReceiptMetaData[i].Topic0s, payload.ReceiptMetaData[i].Topic1s, payload.ReceiptMetaData[i].Topic2s, payload.ReceiptMetaData[i].Topic3s} - if checkReceipts(receipt, receiptFilter.Topics, topics, receiptFilter.Contracts, payload.ReceiptMetaData[i].Contract, trxHashes) { + if checkReceipts(receipt, receiptFilter.Topics, topics, receiptFilter.LogAddresses, payload.ReceiptMetaData[i].LogContracts, trxHashes) { receiptBuffer := new(bytes.Buffer) if err := receipt.EncodeRLP(receiptBuffer); err != nil { return err @@ -193,9 +193,9 @@ func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response * return nil } -func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics [][]string, wantedContracts []string, actualContract string, wantedTrxHashes []common.Hash) bool { +func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics [][]string, wantedAddresses []string, actualAddresses []string, wantedTrxHashes []common.Hash) bool { // If we aren't filtering for any topics, contracts, or corresponding trxs then all receipts are a go - if len(wantedTopics) == 0 && len(wantedContracts) == 0 && len(wantedTrxHashes) == 0 { + if len(wantedTopics) == 0 && len(wantedAddresses) == 0 && len(wantedTrxHashes) == 0 { return true } // Keep receipts that are from watched txs @@ -205,18 +205,20 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics [][]string, wa } } // If there are no wanted contract addresses, we keep all receipts that match the topic filter - if len(wantedContracts) == 0 { + if len(wantedAddresses) == 0 { if match := filterMatch(wantedTopics, actualTopics); match == true { return true } } // If there are wanted contract addresses to filter on - for _, wantedAddr := range wantedContracts { + for _, wantedAddr := range wantedAddresses { // and this is an address of interest - if wantedAddr == actualContract { - // we keep the receipt if it matches on the topic filter - if match := filterMatch(wantedTopics, actualTopics); match == true { - return true + for _, actualAddr := range actualAddresses { + if wantedAddr == actualAddr { + // we keep the receipt if it matches on the topic filter + if match := filterMatch(wantedTopics, actualTopics); match == true { + return true + } } } } diff --git a/pkg/super_node/eth/filterer_test.go b/pkg/super_node/eth/filterer_test.go index 568f0e2e..82e40bc0 100644 --- a/pkg/super_node/eth/filterer_test.go +++ b/pkg/super_node/eth/filterer_test.go @@ -48,12 +48,14 @@ var _ = Describe("Filterer", func() { Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header)) var expectedEmptyUncles []ipfs.BlockModel Expect(iplds.Uncles).To(Equal(expectedEmptyUncles)) - Expect(len(iplds.Transactions)).To(Equal(2)) + Expect(len(iplds.Transactions)).To(Equal(3)) Expect(shared.IPLDsContainBytes(iplds.Transactions, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) Expect(shared.IPLDsContainBytes(iplds.Transactions, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) - Expect(len(iplds.Receipts)).To(Equal(2)) + Expect(shared.IPLDsContainBytes(iplds.Transactions, mocks.MockTransactions.GetRlp(2))).To(BeTrue()) + Expect(len(iplds.Receipts)).To(Equal(3)) Expect(shared.IPLDsContainBytes(iplds.Receipts, mocks.MockReceipts.GetRlp(0))).To(BeTrue()) Expect(shared.IPLDsContainBytes(iplds.Receipts, mocks.MockReceipts.GetRlp(1))).To(BeTrue()) + Expect(shared.IPLDsContainBytes(iplds.Receipts, mocks.MockReceipts.GetRlp(2))).To(BeTrue()) Expect(len(iplds.StateNodes)).To(Equal(2)) for _, stateNode := range iplds.StateNodes { Expect(stateNode.Type).To(Equal(statediff.Leaf)) @@ -74,7 +76,7 @@ var _ = Describe("Filterer", func() { }) It("Applies filters from the provided config.Subscription", func() { - payload1, err := filterer.Filter(rctContractFilter, mocks.MockConvertedPayload) + payload1, err := filterer.Filter(rctAddressFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) iplds1, ok := payload1.(eth.IPLDs) Expect(ok).To(BeTrue()) @@ -86,8 +88,8 @@ var _ = Describe("Filterer", func() { Expect(len(iplds1.StateNodes)).To(Equal(0)) Expect(len(iplds1.Receipts)).To(Equal(1)) Expect(iplds1.Receipts[0]).To(Equal(ipfs.BlockModel{ - Data: mocks.Rct2IPLD.RawData(), - CID: mocks.Rct2IPLD.Cid().String(), + Data: mocks.Rct1IPLD.RawData(), + CID: mocks.Rct1IPLD.Cid().String(), })) payload2, err := filterer.Filter(rctTopicsFilter, mocks.MockConvertedPayload) @@ -106,7 +108,7 @@ var _ = Describe("Filterer", func() { CID: mocks.Rct1IPLD.Cid().String(), })) - payload3, err := filterer.Filter(rctTopicsAndContractFilter, mocks.MockConvertedPayload) + payload3, err := filterer.Filter(rctTopicsAndAddressFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) iplds3, ok := payload3.(eth.IPLDs) Expect(ok).To(BeTrue()) @@ -122,7 +124,7 @@ var _ = Describe("Filterer", func() { CID: mocks.Rct1IPLD.Cid().String(), })) - payload4, err := filterer.Filter(rctContractsAndTopicFilter, mocks.MockConvertedPayload) + payload4, err := filterer.Filter(rctAddressesAndTopicFilter, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) iplds4, ok := payload4.(eth.IPLDs) Expect(ok).To(BeTrue()) @@ -145,14 +147,16 @@ var _ = Describe("Filterer", func() { Expect(iplds5.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) Expect(iplds5.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds5.Uncles)).To(Equal(0)) - Expect(len(iplds5.Transactions)).To(Equal(2)) + Expect(len(iplds5.Transactions)).To(Equal(3)) Expect(shared.IPLDsContainBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) Expect(shared.IPLDsContainBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) + Expect(shared.IPLDsContainBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(2))).To(BeTrue()) Expect(len(iplds5.StorageNodes)).To(Equal(0)) Expect(len(iplds5.StateNodes)).To(Equal(0)) - Expect(len(iplds5.Receipts)).To(Equal(2)) + Expect(len(iplds5.Receipts)).To(Equal(3)) Expect(shared.IPLDsContainBytes(iplds5.Receipts, mocks.MockReceipts.GetRlp(0))).To(BeTrue()) Expect(shared.IPLDsContainBytes(iplds5.Receipts, mocks.MockReceipts.GetRlp(1))).To(BeTrue()) + Expect(shared.IPLDsContainBytes(iplds5.Receipts, mocks.MockReceipts.GetRlp(2))).To(BeTrue()) payload6, err := filterer.Filter(rctsForSelectCollectedTrxs, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) @@ -188,7 +192,7 @@ var _ = Describe("Filterer", func() { CID: mocks.State2IPLD.Cid().String(), })) - payload8, err := filterer.Filter(rctTopicsAndContractFilterFail, mocks.MockConvertedPayload) + payload8, err := filterer.Filter(rctTopicsAndAddressFilterFail, mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) iplds8, ok := payload8.(eth.IPLDs) Expect(ok).To(BeTrue()) diff --git a/pkg/super_node/eth/indexer.go b/pkg/super_node/eth/indexer.go index bbef66ef..f9723fe7 100644 --- a/pkg/super_node/eth/indexer.go +++ b/pkg/super_node/eth/indexer.go @@ -19,15 +19,13 @@ package eth import ( "fmt" - "github.com/ethereum/go-ethereum/crypto" - - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/postgres" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) var ( @@ -92,12 +90,12 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error { func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel, nodeID int64) (int64, error) { var headerID int64 - err := tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) - ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + err := tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, times_validated) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, times_validated) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, eth.header_cids.times_validated + 1) RETURNING id`, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, nodeID, header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp).Scan(&headerID) + header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, 1).Scan(&headerID) return headerID, err } @@ -129,8 +127,9 @@ func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPa } func (in *CIDIndexer) indexReceiptCID(tx *sqlx.Tx, cidMeta ReceiptModel, txID int64) error { - _, err := tx.Exec(`INSERT INTO eth.receipt_cids (tx_id, cid, contract, topic0s, topic1s, topic2s, topic3s) VALUES ($1, $2, $3, $4, $5, $6, $7)`, - txID, cidMeta.CID, cidMeta.Contract, cidMeta.Topic0s, cidMeta.Topic1s, cidMeta.Topic2s, cidMeta.Topic3s) + _, err := tx.Exec(`INSERT INTO eth.receipt_cids (tx_id, cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts) = ($2, $3, $4, $5, $6, $7, $8, $9)`, + txID, cidMeta.CID, cidMeta.Contract, cidMeta.ContractHash, cidMeta.Topic0s, cidMeta.Topic1s, cidMeta.Topic2s, cidMeta.Topic3s, cidMeta.LogContracts) return err } @@ -168,7 +167,7 @@ func (in *CIDIndexer) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload, func (in *CIDIndexer) indexStateAccount(tx *sqlx.Tx, stateAccount StateAccountModel, stateID int64) error { _, err := tx.Exec(`INSERT INTO eth.state_accounts (state_id, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = ($2, $3, $4, $5)`, + ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = ($2, $3, $4, $5)`, stateID, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) return err } @@ -179,7 +178,7 @@ func (in *CIDIndexer) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeModel, storageKey = storageCID.StorageKey } _, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type) VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type) = ($2, $3, $5)`, + ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type) = ($2, $3, $5)`, stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType) return err } diff --git a/pkg/super_node/eth/indexer_test.go b/pkg/super_node/eth/indexer_test.go index 0a0e89bf..471a3c2b 100644 --- a/pkg/super_node/eth/indexer_test.go +++ b/pkg/super_node/eth/indexer_test.go @@ -68,9 +68,10 @@ var _ = Describe("Indexer", func() { WHERE header_cids.block_number = $1` err = db.Select(&trxs, pgStr, 1) Expect(err).ToNot(HaveOccurred()) - Expect(len(trxs)).To(Equal(2)) + Expect(len(trxs)).To(Equal(3)) Expect(shared.ListContainsString(trxs, mocks.Trx1CID.String())).To(BeTrue()) Expect(shared.ListContainsString(trxs, mocks.Trx2CID.String())).To(BeTrue()) + Expect(shared.ListContainsString(trxs, mocks.Trx3CID.String())).To(BeTrue()) // check receipts were properly indexed rcts := make([]string, 0) pgStr = `SELECT receipt_cids.cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids @@ -79,9 +80,10 @@ var _ = Describe("Indexer", func() { AND header_cids.block_number = $1` err = db.Select(&rcts, pgStr, 1) Expect(err).ToNot(HaveOccurred()) - Expect(len(rcts)).To(Equal(2)) + Expect(len(rcts)).To(Equal(3)) Expect(shared.ListContainsString(rcts, mocks.Rct1CID.String())).To(BeTrue()) Expect(shared.ListContainsString(rcts, mocks.Rct2CID.String())).To(BeTrue()) + Expect(shared.ListContainsString(rcts, mocks.Rct3CID.String())).To(BeTrue()) // check that state nodes were properly indexed stateNodes := make([]eth.StateNodeModel, 0) pgStr = `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id diff --git a/pkg/super_node/eth/mocks/test_data.go b/pkg/super_node/eth/mocks/test_data.go index dfefcc71..aca169fc 100644 --- a/pkg/super_node/eth/mocks/test_data.go +++ b/pkg/super_node/eth/mocks/test_data.go @@ -53,64 +53,84 @@ var ( Difficulty: big.NewInt(5000000), Extra: []byte{}, } - MockTransactions, MockReceipts, senderAddr = createTransactionsAndReceipts() + MockTransactions, MockReceipts, SenderAddr = createTransactionsAndReceipts() ReceiptsRlp, _ = rlp.EncodeToBytes(MockReceipts) MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts) MockBlockRlp, _ = rlp.EncodeToBytes(MockBlock) MockHeaderRlp, _ = rlp.EncodeToBytes(MockBlock.Header()) Address = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") AnotherAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593") + ContractAddress = crypto.CreateAddress(SenderAddr, MockTransactions[2].Nonce()) + ContractHash = crypto.Keccak256Hash(ContractAddress.Bytes()).String() mockTopic11 = common.HexToHash("0x04") mockTopic12 = common.HexToHash("0x06") mockTopic21 = common.HexToHash("0x05") mockTopic22 = common.HexToHash("0x07") MockLog1 = &types.Log{ - Topics: []common.Hash{mockTopic11, mockTopic12}, - Data: []byte{}, + Address: Address, + Topics: []common.Hash{mockTopic11, mockTopic12}, + Data: []byte{}, } MockLog2 = &types.Log{ - Topics: []common.Hash{mockTopic21, mockTopic22}, - Data: []byte{}, + Address: AnotherAddress, + Topics: []common.Hash{mockTopic21, mockTopic22}, + Data: []byte{}, } HeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, MockHeaderRlp, multihash.KECCAK_256) Trx1CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(0), multihash.KECCAK_256) Trx2CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(1), multihash.KECCAK_256) + Trx3CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(2), multihash.KECCAK_256) Rct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(0), multihash.KECCAK_256) Rct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(1), multihash.KECCAK_256) + Rct3CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(2), multihash.KECCAK_256) State1CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, ContractLeafNode, multihash.KECCAK_256) State2CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, AccountLeafNode, multihash.KECCAK_256) StorageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, StorageLeafNode, multihash.KECCAK_256) MockTrxMeta = []eth.TxModel{ { CID: "", // This is empty until we go to publish to ipfs - Src: senderAddr.Hex(), + Src: SenderAddr.Hex(), Dst: Address.String(), Index: 0, TxHash: MockTransactions[0].Hash().String(), }, { CID: "", - Src: senderAddr.Hex(), + Src: SenderAddr.Hex(), Dst: AnotherAddress.String(), Index: 1, TxHash: MockTransactions[1].Hash().String(), }, + { + CID: "", + Src: SenderAddr.Hex(), + Dst: "", + Index: 2, + TxHash: MockTransactions[2].Hash().String(), + }, } MockTrxMetaPostPublsh = []eth.TxModel{ { CID: Trx1CID.String(), // This is empty until we go to publish to ipfs - Src: senderAddr.Hex(), + Src: SenderAddr.Hex(), Dst: Address.String(), Index: 0, TxHash: MockTransactions[0].Hash().String(), }, { CID: Trx2CID.String(), - Src: senderAddr.Hex(), + Src: SenderAddr.Hex(), Dst: AnotherAddress.String(), Index: 1, TxHash: MockTransactions[1].Hash().String(), }, + { + CID: Trx3CID.String(), + Src: SenderAddr.Hex(), + Dst: "", + Index: 2, + TxHash: MockTransactions[2].Hash().String(), + }, } MockRctMeta = []eth.ReceiptModel{ { @@ -121,7 +141,11 @@ var ( Topic1s: []string{ mockTopic12.String(), }, - Contract: Address.String(), + Contract: "", + ContractHash: "", + LogContracts: []string{ + Address.String(), + }, }, { CID: "", @@ -131,7 +155,17 @@ var ( Topic1s: []string{ mockTopic22.String(), }, - Contract: AnotherAddress.String(), + Contract: "", + ContractHash: "", + LogContracts: []string{ + AnotherAddress.String(), + }, + }, + { + CID: "", + Contract: ContractAddress.String(), + ContractHash: ContractHash, + LogContracts: []string{}, }, } MockRctMetaPostPublish = []eth.ReceiptModel{ @@ -143,7 +177,11 @@ var ( Topic1s: []string{ mockTopic12.String(), }, - Contract: Address.String(), + Contract: "", + ContractHash: "", + LogContracts: []string{ + Address.String(), + }, }, { CID: Rct2CID.String(), @@ -153,7 +191,17 @@ var ( Topic1s: []string{ mockTopic22.String(), }, - Contract: AnotherAddress.String(), + Contract: "", + ContractHash: "", + LogContracts: []string{ + AnotherAddress.String(), + }, + }, + { + CID: Rct3CID.String(), + Contract: ContractAddress.String(), + ContractHash: ContractHash, + LogContracts: []string{}, }, } @@ -171,7 +219,6 @@ var ( contractRoot = "0x821e2556a290c86405f8160a2d662042a431ba456b9db265c79bb837c04be5f0" contractCodeHash = common.HexToHash("0x753f98a8d4328b15636e46f66f2cb4bc860100aa17967cc145fcd17d1d4710ea") contractPathHash = crypto.Keccak256Hash([]byte{'\x06'}) - ContractAddress = common.HexToAddress("0x703c4b2bD70c169f5717101CaeE543299Fc946C7") ContractLeafKey = testhelpers.AddressToLeafKey(ContractAddress) ContractAccount, _ = rlp.EncodeToBytes(state.Account{ Nonce: nonce1, @@ -310,6 +357,7 @@ var ( ReceiptCIDs: map[common.Hash]eth.ReceiptModel{ MockTransactions[0].Hash(): MockRctMetaPostPublish[0], MockTransactions[1].Hash(): MockRctMetaPostPublish[1], + MockTransactions[2].Hash(): MockRctMetaPostPublish[2], }, StateNodeCIDs: MockStateMetaPostPublish, StorageNodeCIDs: map[common.Hash][]eth.StorageNodeModel{ @@ -353,6 +401,7 @@ var ( UncleRoot: MockBlock.UncleHash().String(), Bloom: MockBlock.Bloom().Bytes(), Timestamp: MockBlock.Time(), + TimesValidated: 1, }, Transactions: MockTrxMetaPostPublsh, Receipts: MockRctMetaPostPublish, @@ -372,8 +421,10 @@ var ( HeaderIPLD, _ = blocks.NewBlockWithCid(MockHeaderRlp, HeaderCID) Trx1IPLD, _ = blocks.NewBlockWithCid(MockTransactions.GetRlp(0), Trx1CID) Trx2IPLD, _ = blocks.NewBlockWithCid(MockTransactions.GetRlp(1), Trx2CID) + Trx3IPLD, _ = blocks.NewBlockWithCid(MockTransactions.GetRlp(2), Trx3CID) Rct1IPLD, _ = blocks.NewBlockWithCid(MockReceipts.GetRlp(0), Rct1CID) Rct2IPLD, _ = blocks.NewBlockWithCid(MockReceipts.GetRlp(1), Rct2CID) + Rct3IPLD, _ = blocks.NewBlockWithCid(MockReceipts.GetRlp(2), Rct3CID) State1IPLD, _ = blocks.NewBlockWithCid(ContractLeafNode, State1CID) State2IPLD, _ = blocks.NewBlockWithCid(AccountLeafNode, State2CID) StorageIPLD, _ = blocks.NewBlockWithCid(StorageLeafNode, StorageCID) @@ -393,6 +444,10 @@ var ( Data: Trx2IPLD.RawData(), CID: Trx2IPLD.Cid().String(), }, + { + Data: Trx3IPLD.RawData(), + CID: Trx3IPLD.Cid().String(), + }, }, Receipts: []ipfs.BlockModel{ { @@ -403,6 +458,10 @@ var ( Data: Rct2IPLD.RawData(), CID: Rct2IPLD.Cid().String(), }, + { + Data: Rct3IPLD.RawData(), + CID: Rct3IPLD.Cid().String(), + }, }, StateNodes: []eth2.StateNode{ { @@ -444,6 +503,7 @@ func createTransactionsAndReceipts() (types.Transactions, types.Receipts, common // make transactions trx1 := types.NewTransaction(0, Address, big.NewInt(1000), 50, big.NewInt(100), []byte{}) trx2 := types.NewTransaction(1, AnotherAddress, big.NewInt(2000), 100, big.NewInt(200), []byte{}) + trx3 := types.NewContractCreation(2, big.NewInt(1500), 75, big.NewInt(150), []byte{0, 1, 2, 3, 4, 5}) transactionSigner := types.MakeSigner(params.MainnetChainConfig, BlockNumber) mockCurve := elliptic.P256() mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rand.Reader) @@ -458,7 +518,11 @@ func createTransactionsAndReceipts() (types.Transactions, types.Receipts, common if err != nil { log.Fatal(err) } - senderAddr, err := types.Sender(transactionSigner, signedTrx1) // same for both trx + signedTrx3, err := types.SignTx(trx3, transactionSigner, mockPrvKey) + if err != nil { + log.Fatal(err) + } + SenderAddr, err := types.Sender(transactionSigner, signedTrx1) // same for both trx if err != nil { log.Fatal(err) } @@ -469,5 +533,8 @@ func createTransactionsAndReceipts() (types.Transactions, types.Receipts, common mockReceipt2 := types.NewReceipt(common.HexToHash("0x1").Bytes(), false, 100) mockReceipt2.Logs = []*types.Log{MockLog2} mockReceipt2.TxHash = signedTrx2.Hash() - return types.Transactions{signedTrx1, signedTrx2}, types.Receipts{mockReceipt1, mockReceipt2}, senderAddr + mockReceipt3 := types.NewReceipt(common.HexToHash("0x2").Bytes(), false, 75) + mockReceipt3.Logs = []*types.Log{} + mockReceipt3.TxHash = signedTrx3.Hash() + return types.Transactions{signedTrx1, signedTrx2, signedTrx3}, types.Receipts{mockReceipt1, mockReceipt2, mockReceipt3}, SenderAddr } diff --git a/pkg/super_node/eth/models.go b/pkg/super_node/eth/models.go index b2a84b44..5f73bfe1 100644 --- a/pkg/super_node/eth/models.go +++ b/pkg/super_node/eth/models.go @@ -34,6 +34,7 @@ type HeaderModel struct { RctRoot string `db:"receipt_root"` Bloom []byte `db:"bloom"` Timestamp uint64 `db:"timestamp"` + TimesValidated int64 `db:"times_validated"` } // UncleModel is the db model for eth.uncle_cids @@ -59,14 +60,16 @@ type TxModel struct { // ReceiptModel is the db model for eth.receipt_cids type ReceiptModel struct { - ID int64 `db:"id"` - TxID int64 `db:"tx_id"` - CID string `db:"cid"` - Contract string `db:"contract"` - Topic0s pq.StringArray `db:"topic0s"` - Topic1s pq.StringArray `db:"topic1s"` - Topic2s pq.StringArray `db:"topic2s"` - Topic3s pq.StringArray `db:"topic3s"` + ID int64 `db:"id"` + TxID int64 `db:"tx_id"` + CID string `db:"cid"` + Contract string `db:"contract"` + ContractHash string `db:"contract_hash"` + LogContracts pq.StringArray `db:"log_contracts"` + Topic0s pq.StringArray `db:"topic0s"` + Topic1s pq.StringArray `db:"topic1s"` + Topic2s pq.StringArray `db:"topic2s"` + Topic3s pq.StringArray `db:"topic3s"` } // StateNodeModel is the db model for eth.state_cids diff --git a/pkg/super_node/eth/publisher.go b/pkg/super_node/eth/publisher.go index 0d09c403..375b449c 100644 --- a/pkg/super_node/eth/publisher.go +++ b/pkg/super_node/eth/publisher.go @@ -188,12 +188,14 @@ func (pub *IPLDPublisher) publishReceipts(receipts []*ipld.EthReceipt, receiptTr return nil, err } rctCids[rct.TxHash] = ReceiptModel{ - CID: cid, - Contract: receiptMeta[i].Contract, - Topic0s: receiptMeta[i].Topic0s, - Topic1s: receiptMeta[i].Topic1s, - Topic2s: receiptMeta[i].Topic2s, - Topic3s: receiptMeta[i].Topic3s, + CID: cid, + Contract: receiptMeta[i].Contract, + ContractHash: receiptMeta[i].ContractHash, + Topic0s: receiptMeta[i].Topic0s, + Topic1s: receiptMeta[i].Topic1s, + Topic2s: receiptMeta[i].Topic2s, + Topic3s: receiptMeta[i].Topic3s, + LogContracts: receiptMeta[i].LogContracts, } } for _, rctNode := range receiptTrie { diff --git a/pkg/super_node/eth/publisher_test.go b/pkg/super_node/eth/publisher_test.go index 62c517de..ca0aa3d6 100644 --- a/pkg/super_node/eth/publisher_test.go +++ b/pkg/super_node/eth/publisher_test.go @@ -55,10 +55,12 @@ var _ = Describe("Publisher", func() { mockTrxDagPutter.CIDsToReturn = map[common.Hash]string{ common.BytesToHash(mocks.Trx1IPLD.RawData()): mocks.Trx1CID.String(), common.BytesToHash(mocks.Trx2IPLD.RawData()): mocks.Trx2CID.String(), + common.BytesToHash(mocks.Trx3IPLD.RawData()): mocks.Trx3CID.String(), } mockRctDagPutter.CIDsToReturn = map[common.Hash]string{ common.BytesToHash(mocks.Rct1IPLD.RawData()): mocks.Rct1CID.String(), common.BytesToHash(mocks.Rct2IPLD.RawData()): mocks.Rct2CID.String(), + common.BytesToHash(mocks.Rct3IPLD.RawData()): mocks.Rct3CID.String(), } mockStateDagPutter.CIDsToReturn = map[common.Hash]string{ common.BytesToHash(mocks.State1IPLD.RawData()): mocks.State1CID.String(), @@ -86,12 +88,14 @@ var _ = Describe("Publisher", func() { Expect(cidPayload.HeaderCID.Reward).To(Equal(mocks.MockCIDPayload.HeaderCID.Reward)) Expect(cidPayload.UncleCIDs).To(Equal(mocks.MockCIDPayload.UncleCIDs)) Expect(cidPayload.HeaderCID).To(Equal(mocks.MockCIDPayload.HeaderCID)) - Expect(len(cidPayload.TransactionCIDs)).To(Equal(2)) + Expect(len(cidPayload.TransactionCIDs)).To(Equal(3)) Expect(cidPayload.TransactionCIDs[0]).To(Equal(mocks.MockCIDPayload.TransactionCIDs[0])) Expect(cidPayload.TransactionCIDs[1]).To(Equal(mocks.MockCIDPayload.TransactionCIDs[1])) - Expect(len(cidPayload.ReceiptCIDs)).To(Equal(2)) + Expect(cidPayload.TransactionCIDs[2]).To(Equal(mocks.MockCIDPayload.TransactionCIDs[2])) + Expect(len(cidPayload.ReceiptCIDs)).To(Equal(3)) Expect(cidPayload.ReceiptCIDs[mocks.MockTransactions[0].Hash()]).To(Equal(mocks.MockCIDPayload.ReceiptCIDs[mocks.MockTransactions[0].Hash()])) Expect(cidPayload.ReceiptCIDs[mocks.MockTransactions[1].Hash()]).To(Equal(mocks.MockCIDPayload.ReceiptCIDs[mocks.MockTransactions[1].Hash()])) + Expect(cidPayload.ReceiptCIDs[mocks.MockTransactions[2].Hash()]).To(Equal(mocks.MockCIDPayload.ReceiptCIDs[mocks.MockTransactions[2].Hash()])) Expect(len(cidPayload.StateNodeCIDs)).To(Equal(2)) Expect(cidPayload.StateNodeCIDs[0]).To(Equal(mocks.MockCIDPayload.StateNodeCIDs[0])) Expect(cidPayload.StateNodeCIDs[1]).To(Equal(mocks.MockCIDPayload.StateNodeCIDs[1])) diff --git a/pkg/super_node/eth/retriever.go b/pkg/super_node/eth/retriever.go index 125b0769..5baa2c72 100644 --- a/pkg/super_node/eth/retriever.go +++ b/pkg/super_node/eth/retriever.go @@ -214,19 +214,19 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]ReceiptModel, error) { log.Debug("retrieving receipt cids for header id ", headerID) args := make([]interface{}, 0, 4) - pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, - receipt_cids.contract, receipt_cids.topic0s, receipt_cids.topic1s, - receipt_cids.topic2s, receipt_cids.topic3s + pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.contract, + receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, + receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids WHERE receipt_cids.tx_id = transaction_cids.id AND transaction_cids.header_id = header_cids.id AND header_cids.id = $1` id := 2 args = append(args, headerID) - if len(rctFilter.Contracts) > 0 { - // Filter on contract addresses if there are any - pgStr += fmt.Sprintf(` AND ((receipt_cids.contract = ANY($%d::VARCHAR(66)[])`, id) - args = append(args, pq.Array(rctFilter.Contracts)) + if len(rctFilter.LogAddresses) > 0 { + // Filter on log contract addresses if there are any + pgStr += fmt.Sprintf(` AND ((receipt_cids.log_contracts && $%d::VARCHAR(66)[]`, id) + args = append(args, pq.Array(rctFilter.LogAddresses)) id++ // Filter on topics if there are any if hasTopics(rctFilter.Topics) { @@ -294,9 +294,9 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) { log.Debug("retrieving receipt cids for block ", blockNumber) args := make([]interface{}, 0, 5) - pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, - receipt_cids.contract, receipt_cids.topic0s, receipt_cids.topic1s, - receipt_cids.topic2s, receipt_cids.topic3s + pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.contract, + receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, + receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids WHERE receipt_cids.tx_id = transaction_cids.id AND transaction_cids.header_id = header_cids.id` @@ -311,10 +311,10 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, b args = append(args, blockHash.String()) id++ } - if len(rctFilter.Contracts) > 0 { - // Filter on contract addresses if there are any - pgStr += fmt.Sprintf(` AND ((receipt_cids.contract = ANY($%d::VARCHAR(66)[])`, id) - args = append(args, pq.Array(rctFilter.Contracts)) + if len(rctFilter.LogAddresses) > 0 { + // Filter on log contract addresses if there are any + pgStr += fmt.Sprintf(` AND ((receipt_cids.log_contracts && $%d::VARCHAR(66)[]`, id) + args = append(args, pq.Array(rctFilter.LogAddresses)) id++ // Filter on topics if there are any if hasTopics(rctFilter.Topics) { @@ -445,7 +445,8 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageF } // RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db -func (ecr *CIDRetriever) RetrieveGapsInData() ([]shared.Gap, error) { +// it finds the union of heights where no data exists and where the times_validated is lower than the validation level +func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) { pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM eth.header_cids LEFT JOIN eth.header_cids r on eth.header_cids.block_number = r.block_number - 1 LEFT JOIN eth.header_cids fr on eth.header_cids.block_number < fr.block_number @@ -455,18 +456,45 @@ func (ecr *CIDRetriever) RetrieveGapsInData() ([]shared.Gap, error) { Start uint64 `db:"start"` Stop uint64 `db:"stop"` }, 0) - err := ecr.db.Select(&results, pgStr) - if err != nil { + if err := ecr.db.Select(&results, pgStr); err != nil { return nil, err } - gaps := make([]shared.Gap, len(results)) + emptyGaps := make([]shared.Gap, len(results)) for i, res := range results { - gaps[i] = shared.Gap{ + emptyGaps[i] = shared.Gap{ Start: res.Start, Stop: res.Stop, } } - return gaps, nil + + // Find sections of blocks where we are below the validation level + // There will be no overlap between these "gaps" and the ones above + pgStr = `SELECT block_number FROM eth.header_cids + WHERE times_validated < $1 + ORDER BY block_number` + var heights []uint64 + if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil { + return nil, err + } + if len(heights) == 0 { + return emptyGaps, nil + } + validationGaps := make([]shared.Gap, 0) + start := heights[0] + lastHeight := start + for _, height := range heights[1:] { + if height == lastHeight+1 { + lastHeight = height + continue + } + validationGaps = append(validationGaps, shared.Gap{ + Start: start, + Stop: lastHeight, + }) + start = height + lastHeight = start + } + return append(emptyGaps, validationGaps...), nil } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash @@ -586,9 +614,9 @@ func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ( // RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]ReceiptModel, error) { log.Debugf("retrieving receipt cids for tx ids %v", txIDs) - pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, - receipt_cids.contract, receipt_cids.topic0s, receipt_cids.topic1s, - receipt_cids.topic2s, receipt_cids.topic3s + pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.contract, + receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, + receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts FROM eth.receipt_cids, eth.transaction_cids WHERE tx_id = ANY($1::INTEGER[]) AND receipt_cids.tx_id = transaction_cids.id diff --git a/pkg/super_node/eth/retriever_test.go b/pkg/super_node/eth/retriever_test.go index d2de0e46..71f4b1d8 100644 --- a/pkg/super_node/eth/retriever_test.go +++ b/pkg/super_node/eth/retriever_test.go @@ -41,7 +41,7 @@ var ( StateFilter: eth.StateFilter{}, StorageFilter: eth.StorageFilter{}, } - rctContractFilter = ð.SubscriptionSettings{ + rctAddressFilter = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{ @@ -51,7 +51,7 @@ var ( Off: true, }, ReceiptFilter: eth.ReceiptFilter{ - Contracts: []string{mocks.AnotherAddress.String()}, + LogAddresses: []string{mocks.Address.String()}, }, StateFilter: eth.StateFilter{ Off: true, @@ -79,7 +79,7 @@ var ( Off: true, }, } - rctTopicsAndContractFilter = ð.SubscriptionSettings{ + rctTopicsAndAddressFilter = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{ @@ -93,7 +93,7 @@ var ( {"0x0000000000000000000000000000000000000000000000000000000000000004"}, {"0x0000000000000000000000000000000000000000000000000000000000000006"}, }, - Contracts: []string{mocks.Address.String()}, + LogAddresses: []string{mocks.Address.String()}, }, StateFilter: eth.StateFilter{ Off: true, @@ -102,7 +102,7 @@ var ( Off: true, }, } - rctTopicsAndContractFilterFail = ð.SubscriptionSettings{ + rctTopicsAndAddressFilterFail = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{ @@ -116,7 +116,7 @@ var ( {"0x0000000000000000000000000000000000000000000000000000000000000004"}, {"0x0000000000000000000000000000000000000000000000000000000000000007"}, // This topic won't match on the mocks.Address.String() contract receipt }, - Contracts: []string{mocks.Address.String()}, + LogAddresses: []string{mocks.Address.String()}, }, StateFilter: eth.StateFilter{ Off: true, @@ -125,7 +125,7 @@ var ( Off: true, }, } - rctContractsAndTopicFilter = ð.SubscriptionSettings{ + rctAddressesAndTopicFilter = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), HeaderFilter: eth.HeaderFilter{ @@ -135,8 +135,8 @@ var ( Off: true, }, ReceiptFilter: eth.ReceiptFilter{ - Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000005"}}, - Contracts: []string{mocks.Address.String(), mocks.AnotherAddress.String()}, + Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000005"}}, + LogAddresses: []string{mocks.Address.String(), mocks.AnotherAddress.String()}, }, StateFilter: eth.StateFilter{ Off: true, @@ -153,9 +153,9 @@ var ( }, TxFilter: eth.TxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter ReceiptFilter: eth.ReceiptFilter{ - MatchTxs: true, - Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have - Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have + MatchTxs: true, + Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have + LogAddresses: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have }, StateFilter: eth.StateFilter{ Off: true, @@ -174,9 +174,9 @@ var ( Dst: []string{mocks.AnotherAddress.String()}, // We only filter for one of the trxs so we will only get the one corresponding receipt }, ReceiptFilter: eth.ReceiptFilter{ - MatchTxs: true, - Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have - Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have + MatchTxs: true, + Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have + LogAddresses: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have }, StateFilter: eth.StateFilter{ Off: true, @@ -240,12 +240,14 @@ var _ = Describe("Retriever", func() { expectedHeaderCID.ID = cidWrapper.Header.ID expectedHeaderCID.NodeID = cidWrapper.Header.NodeID Expect(cidWrapper.Header).To(Equal(expectedHeaderCID)) - Expect(len(cidWrapper.Transactions)).To(Equal(2)) + Expect(len(cidWrapper.Transactions)).To(Equal(3)) Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[0].CID)).To(BeTrue()) Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[1].CID)).To(BeTrue()) - Expect(len(cidWrapper.Receipts)).To(Equal(2)) + Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[2].CID)).To(BeTrue()) + Expect(len(cidWrapper.Receipts)).To(Equal(3)) Expect(eth.ReceiptModelsContainsCID(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[0].CID)).To(BeTrue()) Expect(eth.ReceiptModelsContainsCID(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[1].CID)).To(BeTrue()) + Expect(eth.ReceiptModelsContainsCID(cidWrapper.Receipts, mocks.MockCIDWrapper.Receipts[2].CID)).To(BeTrue()) Expect(len(cidWrapper.StateNodes)).To(Equal(2)) for _, stateNode := range cidWrapper.StateNodes { if stateNode.CID == mocks.State1CID.String() { @@ -267,7 +269,7 @@ var _ = Describe("Retriever", func() { }) It("Applies filters from the provided config.Subscription", func() { - cids1, empty, err := retriever.Retrieve(rctContractFilter, 1) + cids1, empty, err := retriever.Retrieve(rctAddressFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) Expect(len(cids1)).To(Equal(1)) @@ -279,7 +281,7 @@ var _ = Describe("Retriever", func() { Expect(len(cidWrapper1.StateNodes)).To(Equal(0)) Expect(len(cidWrapper1.StorageNodes)).To(Equal(0)) Expect(len(cidWrapper1.Receipts)).To(Equal(1)) - expectedReceiptCID := mocks.MockCIDWrapper.Receipts[1] + expectedReceiptCID := mocks.MockCIDWrapper.Receipts[0] expectedReceiptCID.ID = cidWrapper1.Receipts[0].ID expectedReceiptCID.TxID = cidWrapper1.Receipts[0].TxID Expect(cidWrapper1.Receipts[0]).To(Equal(expectedReceiptCID)) @@ -301,7 +303,7 @@ var _ = Describe("Retriever", func() { expectedReceiptCID.TxID = cidWrapper2.Receipts[0].TxID Expect(cidWrapper2.Receipts[0]).To(Equal(expectedReceiptCID)) - cids3, empty, err := retriever.Retrieve(rctTopicsAndContractFilter, 1) + cids3, empty, err := retriever.Retrieve(rctTopicsAndAddressFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) Expect(len(cids3)).To(Equal(1)) @@ -318,7 +320,7 @@ var _ = Describe("Retriever", func() { expectedReceiptCID.TxID = cidWrapper3.Receipts[0].TxID Expect(cidWrapper3.Receipts[0]).To(Equal(expectedReceiptCID)) - cids4, empty, err := retriever.Retrieve(rctContractsAndTopicFilter, 1) + cids4, empty, err := retriever.Retrieve(rctAddressesAndTopicFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) Expect(len(cids4)).To(Equal(1)) @@ -343,14 +345,16 @@ var _ = Describe("Retriever", func() { Expect(ok).To(BeTrue()) Expect(cidWrapper5.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) Expect(cidWrapper5.Header).To(Equal(eth.HeaderModel{})) - Expect(len(cidWrapper5.Transactions)).To(Equal(2)) + Expect(len(cidWrapper5.Transactions)).To(Equal(3)) Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx1CID.String())).To(BeTrue()) Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx2CID.String())).To(BeTrue()) + Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx3CID.String())).To(BeTrue()) Expect(len(cidWrapper5.StateNodes)).To(Equal(0)) Expect(len(cidWrapper5.StorageNodes)).To(Equal(0)) - Expect(len(cidWrapper5.Receipts)).To(Equal(2)) + Expect(len(cidWrapper5.Receipts)).To(Equal(3)) Expect(eth.ReceiptModelsContainsCID(cidWrapper5.Receipts, mocks.Rct1CID.String())).To(BeTrue()) Expect(eth.ReceiptModelsContainsCID(cidWrapper5.Receipts, mocks.Rct2CID.String())).To(BeTrue()) + Expect(eth.ReceiptModelsContainsCID(cidWrapper5.Receipts, mocks.Rct3CID.String())).To(BeTrue()) cids6, empty, err := retriever.Retrieve(rctsForSelectCollectedTrxs, 1) Expect(err).ToNot(HaveOccurred()) @@ -394,7 +398,7 @@ var _ = Describe("Retriever", func() { Path: []byte{'\x0c'}, })) - _, empty, err = retriever.Retrieve(rctTopicsAndContractFilterFail, 1) + _, empty, err = retriever.Retrieve(rctTopicsAndAddressFilterFail, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).To(BeTrue()) }) @@ -480,7 +484,7 @@ var _ = Describe("Retriever", func() { Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) Expect(err).ToNot(HaveOccurred()) - gaps, err := retriever.RetrieveGapsInData() + gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(0)) }) @@ -490,11 +494,29 @@ var _ = Describe("Retriever", func() { payload.HeaderCID.BlockNumber = "5" err := repo.Index(&payload) Expect(err).ToNot(HaveOccurred()) - gaps, err := retriever.RetrieveGapsInData() + gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(0)) }) + It("Can handle single block gaps", func() { + payload1 := *mocks.MockCIDPayload + payload1.HeaderCID.BlockNumber = "2" + payload2 := payload1 + payload2.HeaderCID.BlockNumber = "4" + err := repo.Index(mocks.MockCIDPayload) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload1) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload2) + Expect(err).ToNot(HaveOccurred()) + gaps, err := retriever.RetrieveGapsInData(1) + Expect(err).ToNot(HaveOccurred()) + Expect(len(gaps)).To(Equal(1)) + Expect(gaps[0].Start).To(Equal(uint64(3))) + Expect(gaps[0].Stop).To(Equal(uint64(3))) + }) + It("Finds gap between two entries", func() { payload1 := *mocks.MockCIDPayload payload1.HeaderCID.BlockNumber = "1010101" @@ -504,7 +526,7 @@ var _ = Describe("Retriever", func() { Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) Expect(err).ToNot(HaveOccurred()) - gaps, err := retriever.RetrieveGapsInData() + gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(1)) Expect(gaps[0].Start).To(Equal(uint64(6))) @@ -536,7 +558,7 @@ var _ = Describe("Retriever", func() { Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload6) Expect(err).ToNot(HaveOccurred()) - gaps, err := retriever.RetrieveGapsInData() + gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(3)) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) diff --git a/pkg/super_node/eth/subscription_config.go b/pkg/super_node/eth/subscription_config.go index a6d563e6..d8ac70d0 100644 --- a/pkg/super_node/eth/subscription_config.go +++ b/pkg/super_node/eth/subscription_config.go @@ -54,9 +54,9 @@ type TxFilter struct { type ReceiptFilter struct { Off bool // TODO: change this so that we filter for receipts first and we always return the corresponding transaction - MatchTxs bool // turn on to retrieve receipts that pair with retrieved transactions - Contracts []string - Topics [][]string + MatchTxs bool // turn on to retrieve receipts that pair with retrieved transactions + LogAddresses []string // receipt contains logs from the provided addresses + Topics [][]string } // StateFilter contains filter settings for state @@ -103,10 +103,10 @@ func NewEthSubscriptionConfig() (*SubscriptionSettings, error) { topics[2] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic2s") topics[3] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic3s") sc.ReceiptFilter = ReceiptFilter{ - Off: viper.GetBool("superNode.ethSubscription.receiptFilter.off"), - MatchTxs: viper.GetBool("superNode.ethSubscription.receiptFilter.matchTxs"), - Contracts: viper.GetStringSlice("superNode.ethSubscription.receiptFilter.contracts"), - Topics: topics, + Off: viper.GetBool("superNode.ethSubscription.receiptFilter.off"), + MatchTxs: viper.GetBool("superNode.ethSubscription.receiptFilter.matchTxs"), + LogAddresses: viper.GetStringSlice("superNode.ethSubscription.receiptFilter.contracts"), + Topics: topics, } // Below defaults to two false, and a slice of length 0 // Which means we get all state leafs by default, but no intermediate nodes diff --git a/pkg/super_node/resync/config.go b/pkg/super_node/resync/config.go index 04c5b118..64c5a266 100644 --- a/pkg/super_node/resync/config.go +++ b/pkg/super_node/resync/config.go @@ -30,20 +30,22 @@ import ( // Env variables const ( - RESYNC_CHAIN = "RESYNC_CHAIN" - RESYNC_START = "RESYNC_START" - RESYNC_STOP = "RESYNC_STOP" - RESYNC_BATCH_SIZE = "RESYNC_BATCH_SIZE" - RESYNC_BATCH_NUMBER = "RESYNC_BATCH_NUMBER" - RESYNC_CLEAR_OLD_CACHE = "RESYNC_CLEAR_OLD_CACHE" - RESYNC_TYPE = "RESYNC_TYPE" + RESYNC_CHAIN = "RESYNC_CHAIN" + RESYNC_START = "RESYNC_START" + RESYNC_STOP = "RESYNC_STOP" + RESYNC_BATCH_SIZE = "RESYNC_BATCH_SIZE" + RESYNC_BATCH_NUMBER = "RESYNC_BATCH_NUMBER" + RESYNC_CLEAR_OLD_CACHE = "RESYNC_CLEAR_OLD_CACHE" + RESYNC_TYPE = "RESYNC_TYPE" + RESYNC_RESET_VALIDATION = "RESYNC_RESET_VALIDATION" ) // Config holds the parameters needed to perform a resync type Config struct { - Chain shared.ChainType // The type of resync to perform - ResyncType shared.DataType // The type of data to resync - ClearOldCache bool // Resync will first clear all the data within the range + Chain shared.ChainType // The type of resync to perform + ResyncType shared.DataType // The type of data to resync + ClearOldCache bool // Resync will first clear all the data within the range + ResetValidation bool // If true, resync will reset the validation level to 0 for the given range // DB info DB *postgres.DB @@ -73,11 +75,13 @@ func NewReSyncConfig() (*Config, error) { viper.BindEnv("bitcoin.httpPath", shared.BTC_HTTP_PATH) viper.BindEnv("resync.batchSize", RESYNC_BATCH_SIZE) viper.BindEnv("resync.batchNumber", RESYNC_BATCH_NUMBER) + viper.BindEnv("resync.resetValidation", RESYNC_RESET_VALIDATION) start := uint64(viper.GetInt64("resync.start")) stop := uint64(viper.GetInt64("resync.stop")) c.Ranges = [][2]uint64{{start, stop}} c.ClearOldCache = viper.GetBool("resync.clearOldCache") + c.ResetValidation = viper.GetBool("resync.resetValidation") c.IPFSPath, err = shared.GetIPFSPath() if err != nil { diff --git a/pkg/super_node/resync/service.go b/pkg/super_node/resync/service.go index 18dacb21..0b9fd1ec 100644 --- a/pkg/super_node/resync/service.go +++ b/pkg/super_node/resync/service.go @@ -58,6 +58,8 @@ type Service struct { ranges [][2]uint64 // Flag to turn on or off old cache destruction clearOldCache bool + // Flag to turn on or off validation level reset + resetValidation bool } // NewResyncService creates and returns a resync service from the provided settings @@ -95,23 +97,30 @@ func NewResyncService(settings *Config) (Resync, error) { batchNumber = super_node.DefaultMaxBatchNumber } return &Service{ - Indexer: indexer, - Converter: converter, - Publisher: publisher, - Retriever: retriever, - Fetcher: fetcher, - Cleaner: cleaner, - BatchSize: batchSize, - BatchNumber: int64(batchNumber), - QuitChan: settings.Quit, - chain: settings.Chain, - ranges: settings.Ranges, - data: settings.ResyncType, - clearOldCache: settings.ClearOldCache, + Indexer: indexer, + Converter: converter, + Publisher: publisher, + Retriever: retriever, + Fetcher: fetcher, + Cleaner: cleaner, + BatchSize: batchSize, + BatchNumber: int64(batchNumber), + QuitChan: settings.Quit, + chain: settings.Chain, + ranges: settings.Ranges, + data: settings.ResyncType, + clearOldCache: settings.ClearOldCache, + resetValidation: settings.ResetValidation, }, nil } func (rs *Service) Resync() error { + if rs.resetValidation { + logrus.Infof("resetting validation level") + if err := rs.Cleaner.ResetValidation(rs.ranges); err != nil { + return fmt.Errorf("validation reset failed: %v", err) + } + } if rs.clearOldCache { logrus.Infof("cleaning out old data from Postgres") if err := rs.Cleaner.Clean(rs.ranges, rs.data); err != nil { diff --git a/pkg/super_node/shared/functions.go b/pkg/super_node/shared/functions.go index efb1cd64..ef338fb7 100644 --- a/pkg/super_node/shared/functions.go +++ b/pkg/super_node/shared/functions.go @@ -20,6 +20,7 @@ import ( "bytes" "github.com/ethereum/go-ethereum/common" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) @@ -53,10 +54,18 @@ func ListContainsGap(gapList []Gap, gap Gap) bool { return false } -// HandleNullAddr converts a nil pointer to an address to a zero-valued hex address string -func HandleNullAddr(to *common.Address) string { +// HandleNullAddrPointer will return an emtpy string for a nil address pointer +func HandleNullAddrPointer(to *common.Address) string { if to == nil { - return "0x0000000000000000000000000000000000000000000000000000000000000000" + return "" + } + return to.Hex() +} + +// HandleNullAddr will return an empty string for a a null address +func HandleNullAddr(to common.Address) string { + if to.Hex() == "0x0000000000000000000000000000000000000000" { + return "" } return to.Hex() } diff --git a/pkg/super_node/shared/intefaces.go b/pkg/super_node/shared/intefaces.go index 39393919..9cf8abff 100644 --- a/pkg/super_node/shared/intefaces.go +++ b/pkg/super_node/shared/intefaces.go @@ -57,7 +57,7 @@ type CIDRetriever interface { Retrieve(filter SubscriptionSettings, blockNumber int64) ([]CIDsForFetching, bool, error) RetrieveFirstBlockNumber() (int64, error) RetrieveLastBlockNumber() (int64, error) - RetrieveGapsInData() ([]Gap, error) + RetrieveGapsInData(validationLevel int) ([]Gap, error) } // IPLDFetcher uses a CID wrapper to fetch an IPLD wrapper @@ -79,6 +79,7 @@ type DagPutter interface { // Cleaner is for cleaning out data from the cache within the given ranges type Cleaner interface { Clean(rngs [][2]uint64, t DataType) error + ResetValidation(rngs [][2]uint64) error } // SubscriptionSettings is the interface every subscription filter type needs to satisfy, no matter the chain diff --git a/pkg/super_node/shared/mocks/retriever.go b/pkg/super_node/shared/mocks/retriever.go index 93efc9a5..d899d0b2 100644 --- a/pkg/super_node/shared/mocks/retriever.go +++ b/pkg/super_node/shared/mocks/retriever.go @@ -46,7 +46,7 @@ func (mcr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) { } // RetrieveGapsInData mock method -func (mcr *CIDRetriever) RetrieveGapsInData() ([]shared.Gap, error) { +func (mcr *CIDRetriever) RetrieveGapsInData(int) ([]shared.Gap, error) { mcr.CalledTimes++ return mcr.GapsToRetrieve, mcr.GapsToRetrieveErr } diff --git a/pkg/watcher/eth/converter.go b/pkg/watcher/eth/converter.go index 98e4702b..b08dbb31 100644 --- a/pkg/watcher/eth/converter.go +++ b/pkg/watcher/eth/converter.go @@ -18,6 +18,7 @@ package eth import ( "fmt" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -94,8 +95,8 @@ func (pc *WatcherConverter) Convert(ethIPLDs eth.IPLDs) (*eth.CIDPayload, error) } // Tx data cids.TransactionCIDs[i] = eth.TxModel{ - Dst: shared.HandleNullAddr(tx.To()), - Src: shared.HandleNullAddr(&from), + Dst: shared.HandleNullAddrPointer(tx.To()), + Src: shared.HandleNullAddr(from), TxHash: tx.Hash().String(), Index: int64(i), CID: txIPLD.CID, @@ -115,25 +116,32 @@ func (pc *WatcherConverter) Convert(ethIPLDs eth.IPLDs) (*eth.CIDPayload, error) } for i, receipt := range receipts { matchedTx := transactions[i] - if matchedTx.To() != nil { - receipt.ContractAddress = *transactions[i].To() - } topicSets := make([][]string, 4) + mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses for _, log := range receipt.Logs { - for i := range topicSets { - if i < len(log.Topics) { - topicSets[i] = append(topicSets[i], log.Topics[i].Hex()) - } + for i, topic := range log.Topics { + topicSets[i] = append(topicSets[i], topic.Hex()) } + mappedContracts[log.Address.String()] = true + } + logContracts := make([]string, 0, len(mappedContracts)) + for addr := range mappedContracts { + logContracts = append(logContracts, addr) + } + contract := shared.HandleNullAddr(receipt.ContractAddress) + var contractHash string + if contract != "" { + contractHash = crypto.Keccak256Hash(common.Hex2Bytes(contract)).String() } // Rct data cids.ReceiptCIDs[matchedTx.Hash()] = eth.ReceiptModel{ - CID: ethIPLDs.Receipts[i].CID, - Topic0s: topicSets[0], - Topic1s: topicSets[1], - Topic2s: topicSets[2], - Topic3s: topicSets[3], - Contract: receipt.ContractAddress.Hex(), + CID: ethIPLDs.Receipts[i].CID, + Topic0s: topicSets[0], + Topic1s: topicSets[1], + Topic2s: topicSets[2], + Topic3s: topicSets[3], + ContractHash: contractHash, + LogContracts: logContracts, } } minerReward := common2.CalcEthBlockReward(&header, uncles, transactions, receipts)