diff --git a/cmd/streamEthSubscribe.go b/cmd/streamEthSubscribe.go index c13f0935..4ff3dede 100644 --- a/cmd/streamEthSubscribe.go +++ b/cmd/streamEthSubscribe.go @@ -67,7 +67,11 @@ func streamEthSubscription() { payloadChan := make(chan super_node.SubscriptionPayload, 20000) // Subscribe to the super node service with the given config/filter parameters - sub, err := str.Stream(payloadChan, ethSubConfig) + rlpParams, err := rlp.EncodeToBytes(ethSubConfig) + if err != nil { + logWithCommand.Fatal(err) + } + sub, err := str.Stream(payloadChan, rlpParams) if err != nil { logWithCommand.Fatal(err) } @@ -83,17 +87,16 @@ func streamEthSubscription() { var ethData eth.IPLDs if err := rlp.DecodeBytes(payload.Data, ðData); err != nil { logWithCommand.Error(err) + continue } - for _, headerRlp := range ethData.Headers { - var header types.Header - err = rlp.Decode(bytes.NewBuffer(headerRlp.Data), &header) - if err != nil { - logWithCommand.Error(err) - continue - } - fmt.Printf("Header number %d, hash %s\n", header.Number.Int64(), header.Hash().Hex()) - fmt.Printf("header: %v\n", header) + var header types.Header + err = rlp.Decode(bytes.NewBuffer(ethData.Header.Data), &header) + if err != nil { + logWithCommand.Error(err) + continue } + fmt.Printf("Header number %d, hash %s\n", header.Number.Int64(), header.Hash().Hex()) + fmt.Printf("header: %v\n", header) for _, trxRlp := range ethData.Transactions { var trx types.Transaction buff := bytes.NewBuffer(trxRlp.Data) @@ -107,7 +110,7 @@ func streamEthSubscription() { fmt.Printf("trx: %v\n", trx) } for _, rctRlp := range ethData.Receipts { - var rct types.ReceiptForStorage + var rct types.Receipt buff := bytes.NewBuffer(rctRlp.Data) stream := rlp.NewStream(buff, 0) err = rct.DecodeRLP(stream) diff --git a/cmd/watch.go b/cmd/watch.go new file mode 100644 index 00000000..0978e425 --- /dev/null +++ b/cmd/watch.go @@ -0,0 +1,43 @@ +// Copyright © 2020 Vulcanize, Inc +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package cmd + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +// watchCmd represents the watch command +var watchCmd = &cobra.Command{ + Use: "watch", + Short: "Watch and transform data from a chain source", + Long: `This command allows one to configure a set of wasm functions and SQL trigger functions +that call them to watch and transform data from the specified chain source. + +A watcher is composed of four parts: +1) Go execution engine- this command- which fetches raw chain data and adds it to the Postres queued ready data tables +2) TOML config file which specifies what subset of chain data to fetch and from where and contains references to the below +3) Set of WASM binaries which are loaded into Postgres and used by +4) Set of PostgreSQL trigger functions which automatically act on data as it is inserted into the queued ready data tables`, + Run: func(cmd *cobra.Command, args []string) { + fmt.Println("watch called") + }, +} + +func init() { + rootCmd.AddCommand(watchCmd) +} diff --git a/db/migrations/00024_create_eth_queued_data_table.sql b/db/migrations/00024_create_eth_queued_data_table.sql new file mode 100644 index 00000000..87f58ad8 --- /dev/null +++ b/db/migrations/00024_create_eth_queued_data_table.sql @@ -0,0 +1,9 @@ +-- +goose Up +CREATE TABLE eth.queue_data ( + id SERIAL PRIMARY KEY, + data BYTEA NOT NULL, + height BIGINT UNIQUE NOT NULL +); + +-- +goose Down +DROP TABLE eth.queue_data; \ No newline at end of file diff --git a/db/migrations/00025_create_btc_queued_data_table.sql b/db/migrations/00025_create_btc_queued_data_table.sql new file mode 100644 index 00000000..c1344f86 --- /dev/null +++ b/db/migrations/00025_create_btc_queued_data_table.sql @@ -0,0 +1,9 @@ +-- +goose Up +CREATE TABLE btc.queue_data ( + id SERIAL PRIMARY KEY, + data BYTEA NOT NULL, + height BIGINT UNIQUE NOT NULL +); + +-- +goose Down +DROP TABLE btc.queue_data; \ No newline at end of file diff --git a/db/schema.sql b/db/schema.sql index d0cef093..367674d3 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -68,6 +68,37 @@ CREATE SEQUENCE btc.header_cids_id_seq ALTER SEQUENCE btc.header_cids_id_seq OWNED BY btc.header_cids.id; +-- +-- Name: queue_data; Type: TABLE; Schema: btc; Owner: - +-- + +CREATE TABLE btc.queue_data ( + id integer NOT NULL, + data bytea NOT NULL, + height bigint NOT NULL +); + + +-- +-- Name: queue_data_id_seq; Type: SEQUENCE; Schema: btc; Owner: - +-- + +CREATE SEQUENCE btc.queue_data_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: queue_data_id_seq; Type: SEQUENCE OWNED BY; Schema: btc; Owner: - +-- + +ALTER SEQUENCE btc.queue_data_id_seq OWNED BY btc.queue_data.id; + + -- -- Name: transaction_cids; Type: TABLE; Schema: btc; Owner: - -- @@ -185,7 +216,8 @@ CREATE TABLE eth.header_cids ( parent_hash character varying(66) NOT NULL, cid text NOT NULL, td numeric NOT NULL, - node_id integer NOT NULL + node_id integer NOT NULL, + reward numeric NOT NULL ); @@ -209,6 +241,37 @@ CREATE SEQUENCE eth.header_cids_id_seq ALTER SEQUENCE eth.header_cids_id_seq OWNED BY eth.header_cids.id; +-- +-- Name: queue_data; Type: TABLE; Schema: eth; Owner: - +-- + +CREATE TABLE eth.queue_data ( + id integer NOT NULL, + data bytea NOT NULL, + height bigint NOT NULL +); + + +-- +-- Name: queue_data_id_seq; Type: SEQUENCE; Schema: eth; Owner: - +-- + +CREATE SEQUENCE eth.queue_data_id_seq + AS integer + START WITH 1 + INCREMENT BY 1 + NO MINVALUE + NO MAXVALUE + CACHE 1; + + +-- +-- Name: queue_data_id_seq; Type: SEQUENCE OWNED BY; Schema: eth; Owner: - +-- + +ALTER SEQUENCE eth.queue_data_id_seq OWNED BY eth.queue_data.id; + + -- -- Name: receipt_cids; Type: TABLE; Schema: eth; Owner: - -- @@ -355,7 +418,8 @@ CREATE TABLE eth.uncle_cids ( header_id integer NOT NULL, block_hash character varying(66) NOT NULL, parent_hash character varying(66) NOT NULL, - cid text NOT NULL + cid text NOT NULL, + reward numeric NOT NULL ); @@ -771,6 +835,13 @@ ALTER SEQUENCE public.watched_logs_id_seq OWNED BY public.watched_logs.id; ALTER TABLE ONLY btc.header_cids ALTER COLUMN id SET DEFAULT nextval('btc.header_cids_id_seq'::regclass); +-- +-- Name: queue_data id; Type: DEFAULT; Schema: btc; Owner: - +-- + +ALTER TABLE ONLY btc.queue_data ALTER COLUMN id SET DEFAULT nextval('btc.queue_data_id_seq'::regclass); + + -- -- Name: transaction_cids id; Type: DEFAULT; Schema: btc; Owner: - -- @@ -799,6 +870,13 @@ ALTER TABLE ONLY btc.tx_outputs ALTER COLUMN id SET DEFAULT nextval('btc.tx_outp ALTER TABLE ONLY eth.header_cids ALTER COLUMN id SET DEFAULT nextval('eth.header_cids_id_seq'::regclass); +-- +-- Name: queue_data id; Type: DEFAULT; Schema: eth; Owner: - +-- + +ALTER TABLE ONLY eth.queue_data ALTER COLUMN id SET DEFAULT nextval('eth.queue_data_id_seq'::regclass); + + -- -- Name: receipt_cids id; Type: DEFAULT; Schema: eth; Owner: - -- @@ -927,6 +1005,22 @@ ALTER TABLE ONLY btc.header_cids ADD CONSTRAINT header_cids_pkey PRIMARY KEY (id); +-- +-- Name: queue_data queue_data_height_key; Type: CONSTRAINT; Schema: btc; Owner: - +-- + +ALTER TABLE ONLY btc.queue_data + ADD CONSTRAINT queue_data_height_key UNIQUE (height); + + +-- +-- Name: queue_data queue_data_pkey; Type: CONSTRAINT; Schema: btc; Owner: - +-- + +ALTER TABLE ONLY btc.queue_data + ADD CONSTRAINT queue_data_pkey PRIMARY KEY (id); + + -- -- Name: transaction_cids transaction_cids_pkey; Type: CONSTRAINT; Schema: btc; Owner: - -- @@ -991,6 +1085,22 @@ ALTER TABLE ONLY eth.header_cids ADD CONSTRAINT header_cids_pkey PRIMARY KEY (id); +-- +-- Name: queue_data queue_data_height_key; Type: CONSTRAINT; Schema: eth; Owner: - +-- + +ALTER TABLE ONLY eth.queue_data + ADD CONSTRAINT queue_data_height_key UNIQUE (height); + + +-- +-- Name: queue_data queue_data_pkey; Type: CONSTRAINT; Schema: eth; Owner: - +-- + +ALTER TABLE ONLY eth.queue_data + ADD CONSTRAINT queue_data_pkey PRIMARY KEY (id); + + -- -- Name: receipt_cids receipt_cids_pkey; Type: CONSTRAINT; Schema: eth; Owner: - -- diff --git a/documentation/super_node/subscription.md b/documentation/super_node/subscription.md index 09bd1ae0..a9a68ec2 100644 --- a/documentation/super_node/subscription.md +++ b/documentation/super_node/subscription.md @@ -17,7 +17,7 @@ The config for `streamEthSubscribe` has a set of parameters to fill the [EthSubs ```toml [superNode] [superNode.ethSubscription] - historicalData = true + historicalData = false historicalDataOnly = false startingBlock = 0 endingBlock = 0 @@ -27,26 +27,18 @@ The config for `streamEthSubscribe` has a set of parameters to fill the [EthSubs uncles = false [superNode.ethSubscription.txFilter] off = false - src = [ - "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", - ] - dst = [ - "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", - ] + src = [] + dst = [] [superNode.ethSubscription.receiptFilter] off = false contracts = [] - topics = [ - [ - "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", - "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377" - ] - ] + topic0s = [] + topic1s = [] + topic2s = [] + topic3s = [] [superNode.ethSubscription.stateFilter] off = false - addresses = [ - "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe" - ] + addresses = [] intermediateNodes = false [superNode.ethSubscription.storageFilter] off = true diff --git a/environments/superNodeSubscription.toml b/environments/superNodeSubscription.toml index b3cea0c0..79f21775 100644 --- a/environments/superNodeSubscription.toml +++ b/environments/superNodeSubscription.toml @@ -1,6 +1,6 @@ [superNode] [superNode.ethSubscription] - historicalData = true + historicalData = false historicalDataOnly = false startingBlock = 0 endingBlock = 0 @@ -10,26 +10,18 @@ uncles = false [superNode.ethSubscription.txFilter] off = false - src = [ - "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", - ] - dst = [ - "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe", - ] + src = [] + dst = [] [superNode.ethSubscription.receiptFilter] off = false contracts = [] - topics = [ - [ - "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", - "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377" - ] - ] + topic0s = [] + topic1s = [] + topic2s = [] + topic3s = [] [superNode.ethSubscription.stateFilter] off = false - addresses = [ - "0xde0B295669a9FD93d5F28D9Ec85E40f4cb697BAe" - ] + addresses = [] intermediateNodes = false [superNode.ethSubscription.storageFilter] off = true diff --git a/libraries/shared/streamer/super_node_streamer.go b/libraries/shared/streamer/super_node_streamer.go index 26c9479a..e57f2b44 100644 --- a/libraries/shared/streamer/super_node_streamer.go +++ b/libraries/shared/streamer/super_node_streamer.go @@ -22,7 +22,6 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/super_node" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // SuperNodeStreamer is the underlying struct for the shared.SuperNodeStreamer interface @@ -38,6 +37,6 @@ func NewSuperNodeStreamer(client core.RPCClient) *SuperNodeStreamer { } // Stream is the main loop for subscribing to data from a vulcanizedb super node -func (sds *SuperNodeStreamer) Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error) { - return sds.Client.Subscribe("vdb", payloadChan, "stream", params) +func (sds *SuperNodeStreamer) Stream(payloadChan chan super_node.SubscriptionPayload, rlpParams []byte) (*rpc.ClientSubscription, error) { + return sds.Client.Subscribe("vdb", payloadChan, "stream", rlpParams) } diff --git a/pkg/super_node/api.go b/pkg/super_node/api.go index 32dcf2c2..85839f86 100644 --- a/pkg/super_node/api.go +++ b/pkg/super_node/api.go @@ -19,12 +19,15 @@ package super_node import ( "context" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/eth/core" + "github.com/vulcanize/vulcanizedb/pkg/super_node/btc" + "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // APIName is the namespace used for the state diffing service API @@ -46,7 +49,24 @@ func NewPublicSuperNodeAPI(superNodeInterface SuperNode) *PublicSuperNodeAPI { } // Stream is the public method to setup a subscription that fires off super node payloads as they are processed -func (api *PublicSuperNodeAPI) Stream(ctx context.Context, params shared.SubscriptionSettings) (*rpc.Subscription, error) { +func (api *PublicSuperNodeAPI) Stream(ctx context.Context, rlpParams []byte) (*rpc.Subscription, error) { + var params shared.SubscriptionSettings + switch api.sn.Chain() { + case shared.Ethereum: + var ethParams eth.SubscriptionSettings + if err := rlp.DecodeBytes(rlpParams, ðParams); err != nil { + return nil, err + } + params = ðParams + case shared.Bitcoin: + var btcParams btc.SubscriptionSettings + if err := rlp.DecodeBytes(rlpParams, &btcParams); err != nil { + return nil, err + } + params = &btcParams + default: + panic("SuperNode is not configured for a specific chain type") + } // ensure that the RPC connection supports subscriptions notifier, supported := rpc.NotifierFromContext(ctx) if !supported { diff --git a/pkg/super_node/btc/filterer.go b/pkg/super_node/btc/filterer.go index 1a749274..5fa55d68 100644 --- a/pkg/super_node/btc/filterer.go +++ b/pkg/super_node/btc/filterer.go @@ -72,10 +72,10 @@ func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IP if err != nil { return err } - response.Headers = append(response.Headers, ipfs.BlockModel{ + response.Header = ipfs.BlockModel{ Data: data, CID: cid.String(), - }) + } } return nil } diff --git a/pkg/super_node/btc/ipld_fetcher.go b/pkg/super_node/btc/ipld_fetcher.go index b7fb49cd..c839e031 100644 --- a/pkg/super_node/btc/ipld_fetcher.go +++ b/pkg/super_node/btc/ipld_fetcher.go @@ -60,7 +60,7 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { iplds := IPLDs{} iplds.BlockNumber = cidWrapper.BlockNumber var err error - iplds.Headers, err = f.FetchHeaders(cidWrapper.Headers) + iplds.Header, err = f.FetchHeader(cidWrapper.Header) if err != nil { return nil, err } @@ -72,30 +72,21 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { } // FetchHeaders fetches headers -// It uses the f.fetchBatch method -func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([]ipfs.BlockModel, error) { - log.Debug("fetching header iplds") - headerCids := make([]cid.Cid, len(cids)) - for i, c := range cids { - dc, err := cid.Decode(c.CID) - if err != nil { - return nil, err - } - headerCids[i] = dc +// It uses the f.fetch method +func (f *IPLDFetcher) FetchHeader(c HeaderModel) (ipfs.BlockModel, error) { + log.Debug("fetching header ipld") + dc, err := cid.Decode(c.CID) + if err != nil { + return ipfs.BlockModel{}, err } - headers := f.fetchBatch(headerCids) - headerIPLDs := make([]ipfs.BlockModel, len(headers)) - for i, header := range headers { - headerIPLDs[i] = ipfs.BlockModel{ - Data: header.RawData(), - CID: header.Cid().String(), - } + header, err := f.fetch(dc) + if err != nil { + return ipfs.BlockModel{}, err } - if len(headerIPLDs) != len(headerCids) { - log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids)) - return headerIPLDs, errUnexpectedNumberOfIPLDs - } - return headerIPLDs, nil + return ipfs.BlockModel{ + Data: header.RawData(), + CID: header.Cid().String(), + }, nil } // FetchTrxs fetches transactions diff --git a/pkg/super_node/btc/retriever.go b/pkg/super_node/btc/retriever.go index d49bd3c4..864812bd 100644 --- a/pkg/super_node/btc/retriever.go +++ b/pkg/super_node/btc/retriever.go @@ -57,7 +57,7 @@ func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) { } // Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters -func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) { +func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) { streamFilter, ok := filter.(*SubscriptionSettings) if !ok { return nil, true, fmt.Errorf("btc retriever expected filter type %T got %T", &SubscriptionSettings{}, filter) @@ -68,38 +68,42 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe return nil, true, err } - cw := new(CIDWrapper) - cw.BlockNumber = big.NewInt(blockNumber) // Retrieve cached header CIDs - if !streamFilter.HeaderFilter.Off { - cw.Headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber) - if err != nil { - if err := tx.Rollback(); err != nil { - log.Error(err) - } - log.Error("header cid retrieval error") - return nil, true, err + headers, err := ecr.RetrieveHeaderCIDs(tx, blockNumber) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) } + log.Error("header cid retrieval error") + return nil, true, err } - // Retrieve cached trx CIDs - if !streamFilter.TxFilter.Off { - cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, blockNumber) - if err != nil { - if err := tx.Rollback(); err != nil { - log.Error(err) - } - log.Error("transaction cid retrieval error") - return nil, true, err + cws := make([]shared.CIDsForFetching, len(headers)) + empty := true + for i, header := range headers { + cw := new(CIDWrapper) + cw.BlockNumber = big.NewInt(blockNumber) + if !streamFilter.HeaderFilter.Off { + cw.Header = header + empty = false } + // Retrieve cached trx CIDs + if !streamFilter.TxFilter.Off { + cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) + } + log.Error("transaction cid retrieval error") + return nil, true, err + } + if len(cw.Transactions) > 0 { + empty = false + } + } + cws[i] = cw } - return cw, empty(cw), tx.Commit() -} -func empty(cidWrapper *CIDWrapper) bool { - if len(cidWrapper.Transactions) > 0 || len(cidWrapper.Headers) > 0 { - return false - } - return true + return cws, empty, tx.Commit() } // RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight @@ -111,32 +115,10 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]H return headers, tx.Select(&headers, pgStr, blockNumber) } -/* -type TxModel struct { - ID int64 `db:"id"` - HeaderID int64 `db:"header_id"` - Index int64 `db:"index"` - TxHash string `db:"tx_hash"` - CID string `db:"cid"` - SegWit bool `db:"segwit"` - WitnessHash string `db:"witness_hash"` -} -// TxFilter contains filter settings for txs -type TxFilter struct { - Off bool - Index int64 // allow filtering by index so that we can filter for only coinbase transactions (index 0) if we want to - Segwit bool // allow filtering for segwit trxs - WitnessHashes []string // allow filtering for specific witness hashes - PkScriptClass uint8 // allow filtering for txs that have at least one tx output with the specified pkscript class - MultiSig bool // allow filtering for txs that have at least one tx output that requires more than one signature - Addresses []string // allow filtering for txs that have at least one tx output with at least one of the provided addresses -} -*/ - // RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters // also returns the ids for the returned transaction cids -func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNumber int64) ([]TxModel, error) { - log.Debug("retrieving transaction cids for block ", blockNumber) +func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) { + log.Debug("retrieving transaction cids for header id ", headerID) args := make([]interface{}, 0, 3) results := make([]TxModel, 0) id := 1 @@ -147,8 +129,8 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNum WHERE transaction_cids.header_id = header_cids.id AND tx_inputs.tx_id = transaction_cids.id AND tx_outputs.tx_id = transaction_cids.id - AND header_cids.block_number = $%d`, id) - args = append(args, blockNumber) + AND header_cids.id = $%d`, id) + args = append(args, headerID) id++ if txFilter.Segwit { pgStr += ` AND transaction_cids.segwit = true` diff --git a/pkg/super_node/btc/types.go b/pkg/super_node/btc/types.go index 03282b51..8e8e74b9 100644 --- a/pkg/super_node/btc/types.go +++ b/pkg/super_node/btc/types.go @@ -58,7 +58,7 @@ type CIDPayload struct { // Passed to IPLDFetcher type CIDWrapper struct { BlockNumber *big.Int - Headers []HeaderModel + Header HeaderModel Transactions []TxModel } @@ -66,7 +66,7 @@ type CIDWrapper struct { // Returned by IPLDFetcher and ResponseFilterer type IPLDs struct { BlockNumber *big.Int - Headers []ipfs.BlockModel + Header ipfs.BlockModel Transactions []ipfs.BlockModel } diff --git a/pkg/super_node/eth/backend.go b/pkg/super_node/eth/backend.go index 8d53b759..0b23a0d5 100644 --- a/pkg/super_node/eth/backend.go +++ b/pkg/super_node/eth/backend.go @@ -86,7 +86,7 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe return nil, fmt.Errorf("header at block %d is not available", number) } // Fetch the header IPLDs for those CIDs - headerIPLDs, err := b.Fetcher.FetchHeaders([]HeaderModel{headerCids[0]}) + headerIPLD, err := b.Fetcher.FetchHeader(headerCids[0]) if err != nil { return nil, err } @@ -94,7 +94,7 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe // We throw an error in FetchHeaders() if the number of headers does not match the number of CIDs and we already // confirmed the number of CIDs is greater than 0 so there is no need to bound check the slice before accessing var header types.Header - if err := rlp.DecodeBytes(headerIPLDs[0].Data, &header); err != nil { + if err := rlp.DecodeBytes(headerIPLD.Data, &header); err != nil { return nil, err } return &header, nil @@ -172,12 +172,12 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber } // Fetch and decode the header IPLD - headerIPLDs, err := b.Fetcher.FetchHeaders([]HeaderModel{headerCID}) + headerIPLD, err := b.Fetcher.FetchHeader(headerCID) if err != nil { return nil, err } var header types.Header - if err := rlp.DecodeBytes(headerIPLDs[0].Data, &header); err != nil { + if err := rlp.DecodeBytes(headerIPLD.Data, &header); err != nil { return nil, err } // Fetch and decode the uncle IPLDs @@ -232,12 +232,12 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo return nil, err } // Fetch and decode the header IPLD - headerIPLDs, err := b.Fetcher.FetchHeaders([]HeaderModel{headerCID}) + headerIPLD, err := b.Fetcher.FetchHeader(headerCID) if err != nil { return nil, err } var header types.Header - if err := rlp.DecodeBytes(headerIPLDs[0].Data, &header); err != nil { + if err := rlp.DecodeBytes(headerIPLD.Data, &header); err != nil { return nil, err } // Fetch and decode the uncle IPLDs diff --git a/pkg/super_node/eth/filterer.go b/pkg/super_node/eth/filterer.go index 33548c80..8e1ccad8 100644 --- a/pkg/super_node/eth/filterer.go +++ b/pkg/super_node/eth/filterer.go @@ -87,10 +87,10 @@ func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IP if err != nil { return err } - response.Headers = append(response.Headers, ipfs.BlockModel{ + response.Header = ipfs.BlockModel{ Data: headerRLP, CID: cid.String(), - }) + } if headerFilter.Uncles { response.Uncles = make([]ipfs.BlockModel, len(payload.Block.Body().Uncles)) for i, uncle := range payload.Block.Body().Uncles { @@ -126,6 +126,7 @@ func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLD trxHashes = make([]common.Hash, 0, trxLen) response.Transactions = make([]ipfs.BlockModel, 0, trxLen) for i, trx := range payload.Block.Body().Transactions { + // TODO: check if want corresponding receipt and if we do we must include this transaction if checkTransactionAddrs(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) { trxBuffer := new(bytes.Buffer) if err := trx.EncodeRLP(trxBuffer); err != nil { diff --git a/pkg/super_node/eth/filterer_test.go b/pkg/super_node/eth/filterer_test.go index f4a1a3e5..98ef3c09 100644 --- a/pkg/super_node/eth/filterer_test.go +++ b/pkg/super_node/eth/filterer_test.go @@ -43,7 +43,7 @@ var _ = Describe("Filterer", func() { iplds, ok := payload.(eth.IPLDs) Expect(ok).To(BeTrue()) Expect(iplds.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) - Expect(iplds.Headers).To(Equal(mocks.MockIPLDs.Headers)) + Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header)) var expectedEmptyUncles []ipfs.BlockModel Expect(iplds.Uncles).To(Equal(expectedEmptyUncles)) Expect(len(iplds.Transactions)).To(Equal(2)) @@ -77,7 +77,7 @@ var _ = Describe("Filterer", func() { iplds1, ok := payload1.(eth.IPLDs) Expect(ok).To(BeTrue()) Expect(iplds1.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) - Expect(len(iplds1.Headers)).To(Equal(0)) + Expect(iplds1.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds1.Uncles)).To(Equal(0)) Expect(len(iplds1.Transactions)).To(Equal(0)) Expect(len(iplds1.StorageNodes)).To(Equal(0)) @@ -93,7 +93,7 @@ var _ = Describe("Filterer", func() { iplds2, ok := payload2.(eth.IPLDs) Expect(ok).To(BeTrue()) Expect(iplds2.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) - Expect(len(iplds2.Headers)).To(Equal(0)) + Expect(iplds2.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds2.Uncles)).To(Equal(0)) Expect(len(iplds2.Transactions)).To(Equal(0)) Expect(len(iplds2.StorageNodes)).To(Equal(0)) @@ -109,7 +109,7 @@ var _ = Describe("Filterer", func() { iplds3, ok := payload3.(eth.IPLDs) Expect(ok).To(BeTrue()) Expect(iplds3.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) - Expect(len(iplds3.Headers)).To(Equal(0)) + Expect(iplds3.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds3.Uncles)).To(Equal(0)) Expect(len(iplds3.Transactions)).To(Equal(0)) Expect(len(iplds3.StorageNodes)).To(Equal(0)) @@ -125,7 +125,7 @@ var _ = Describe("Filterer", func() { iplds4, ok := payload4.(eth.IPLDs) Expect(ok).To(BeTrue()) Expect(iplds4.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) - Expect(len(iplds4.Headers)).To(Equal(0)) + Expect(iplds4.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds4.Uncles)).To(Equal(0)) Expect(len(iplds4.Transactions)).To(Equal(0)) Expect(len(iplds4.StorageNodes)).To(Equal(0)) @@ -141,7 +141,7 @@ var _ = Describe("Filterer", func() { iplds5, ok := payload5.(eth.IPLDs) Expect(ok).To(BeTrue()) Expect(iplds5.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) - Expect(len(iplds5.Headers)).To(Equal(0)) + Expect(iplds5.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds5.Uncles)).To(Equal(0)) Expect(len(iplds5.Transactions)).To(Equal(2)) Expect(shared.IPLDsContainBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) @@ -157,7 +157,7 @@ var _ = Describe("Filterer", func() { iplds6, ok := payload6.(eth.IPLDs) Expect(ok).To(BeTrue()) Expect(iplds6.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) - Expect(len(iplds6.Headers)).To(Equal(0)) + Expect(iplds6.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds6.Uncles)).To(Equal(0)) Expect(len(iplds6.Transactions)).To(Equal(1)) Expect(shared.IPLDsContainBytes(iplds5.Transactions, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) @@ -174,7 +174,7 @@ var _ = Describe("Filterer", func() { iplds7, ok := payload7.(eth.IPLDs) Expect(ok).To(BeTrue()) Expect(iplds7.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) - Expect(len(iplds7.Headers)).To(Equal(0)) + Expect(iplds7.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds7.Uncles)).To(Equal(0)) Expect(len(iplds7.Transactions)).To(Equal(0)) Expect(len(iplds7.StorageNodes)).To(Equal(0)) @@ -191,7 +191,7 @@ var _ = Describe("Filterer", func() { iplds8, ok := payload8.(eth.IPLDs) Expect(ok).To(BeTrue()) Expect(iplds8.BlockNumber.Int64()).To(Equal(mocks.MockIPLDs.BlockNumber.Int64())) - Expect(len(iplds8.Headers)).To(Equal(0)) + Expect(iplds8.Header).To(Equal(ipfs.BlockModel{})) Expect(len(iplds8.Uncles)).To(Equal(0)) Expect(len(iplds8.Transactions)).To(Equal(0)) Expect(len(iplds8.StorageNodes)).To(Equal(0)) diff --git a/pkg/super_node/eth/indexer.go b/pkg/super_node/eth/indexer.go index 7b81e66c..77637677 100644 --- a/pkg/super_node/eth/indexer.go +++ b/pkg/super_node/eth/indexer.go @@ -55,6 +55,7 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error { if err := tx.Rollback(); err != nil { log.Error(err) } + log.Error("eth indexer error when indexing header") return err } for _, uncle := range cidPayload.UncleCIDs { @@ -62,6 +63,7 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error { if err := tx.Rollback(); err != nil { log.Error(err) } + log.Error("eth indexer error when indexing uncle") return err } } @@ -69,12 +71,14 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error { if err := tx.Rollback(); err != nil { log.Error(err) } + log.Error("eth indexer error when indexing transactions and receipts") return err } if err := in.indexStateAndStorageCIDs(tx, cidPayload, headerID); err != nil { if err := tx.Rollback(); err != nil { log.Error(err) } + log.Error("eth indexer error when indexing state and storage nodes") return err } return tx.Commit() @@ -92,7 +96,7 @@ func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel, nodeID int func (in *CIDIndexer) indexUncleCID(tx *sqlx.Tx, uncle UncleModel, headerID int64) error { _, err := tx.Exec(`INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward) = ($3, $4, $5)`, - uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID) + uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward) return err } diff --git a/pkg/super_node/eth/ipld_fetcher.go b/pkg/super_node/eth/ipld_fetcher.go index 098025bf..4401c774 100644 --- a/pkg/super_node/eth/ipld_fetcher.go +++ b/pkg/super_node/eth/ipld_fetcher.go @@ -61,7 +61,7 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { iplds := IPLDs{} iplds.BlockNumber = cidWrapper.BlockNumber var err error - iplds.Headers, err = f.FetchHeaders(cidWrapper.Headers) + iplds.Header, err = f.FetchHeader(cidWrapper.Header) if err != nil { return nil, err } @@ -89,30 +89,21 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { } // FetchHeaders fetches headers -// It uses the f.fetchBatch method -func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([]ipfs.BlockModel, error) { - log.Debug("fetching header iplds") - headerCids := make([]cid.Cid, len(cids)) - for i, c := range cids { - dc, err := cid.Decode(c.CID) - if err != nil { - return nil, err - } - headerCids[i] = dc +// It uses the f.fetch method +func (f *IPLDFetcher) FetchHeader(c HeaderModel) (ipfs.BlockModel, error) { + log.Debug("fetching header ipld") + dc, err := cid.Decode(c.CID) + if err != nil { + return ipfs.BlockModel{}, err } - headers := f.fetchBatch(headerCids) - headerIPLDs := make([]ipfs.BlockModel, len(headers)) - for i, header := range headers { - headerIPLDs[i] = ipfs.BlockModel{ - Data: header.RawData(), - CID: header.Cid().String(), - } + header, err := f.fetch(dc) + if err != nil { + return ipfs.BlockModel{}, err } - if len(headerIPLDs) != len(headerCids) { - log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids)) - return headerIPLDs, errUnexpectedNumberOfIPLDs - } - return headerIPLDs, nil + return ipfs.BlockModel{ + Data: header.RawData(), + CID: header.Cid().String(), + }, nil } // FetchUncles fetches uncles diff --git a/pkg/super_node/eth/ipld_fetcher_test.go b/pkg/super_node/eth/ipld_fetcher_test.go index 34044eb5..d409862c 100644 --- a/pkg/super_node/eth/ipld_fetcher_test.go +++ b/pkg/super_node/eth/ipld_fetcher_test.go @@ -50,10 +50,8 @@ var ( mockBlockService *mocks.MockIPFSBlockService mockCIDWrapper = ð.CIDWrapper{ BlockNumber: big.NewInt(9000), - Headers: []eth.HeaderModel{ - { - CID: mockHeaderBlock.Cid().String(), - }, + Header: eth.HeaderModel{ + CID: mockHeaderBlock.Cid().String(), }, Uncles: []eth.UncleModel{ { @@ -107,8 +105,7 @@ var _ = Describe("Fetcher", func() { iplds, ok := i.(eth.IPLDs) Expect(ok).To(BeTrue()) Expect(iplds.BlockNumber).To(Equal(mockCIDWrapper.BlockNumber)) - Expect(len(iplds.Headers)).To(Equal(1)) - Expect(iplds.Headers[0]).To(Equal(ipfs.BlockModel{ + Expect(iplds.Header).To(Equal(ipfs.BlockModel{ Data: mockHeaderBlock.RawData(), CID: mockHeaderBlock.Cid().String(), })) diff --git a/pkg/super_node/eth/mocks/test_data.go b/pkg/super_node/eth/mocks/test_data.go index 52a978d4..80065e7b 100644 --- a/pkg/super_node/eth/mocks/test_data.go +++ b/pkg/super_node/eth/mocks/test_data.go @@ -296,15 +296,13 @@ var ( MockCIDWrapper = ð.CIDWrapper{ BlockNumber: big.NewInt(1), - Headers: []eth2.HeaderModel{ - { - BlockNumber: "1", - BlockHash: MockBlock.Hash().String(), - ParentHash: "0x0000000000000000000000000000000000000000000000000000000000000000", - CID: HeaderCID.String(), - TotalDifficulty: MockBlock.Difficulty().String(), - Reward: "5000000000000000000", - }, + Header: eth2.HeaderModel{ + BlockNumber: "1", + BlockHash: MockBlock.Hash().String(), + ParentHash: "0x0000000000000000000000000000000000000000000000000000000000000000", + CID: HeaderCID.String(), + TotalDifficulty: MockBlock.Difficulty().String(), + Reward: "5000000000000000000", }, Transactions: MockTrxMetaPostPublsh, Receipts: MockRctMetaPostPublish, @@ -331,11 +329,9 @@ var ( MockIPLDs = eth.IPLDs{ BlockNumber: big.NewInt(1), - Headers: []ipfs.BlockModel{ - { - Data: HeaderIPLD.RawData(), - CID: HeaderIPLD.Cid().String(), - }, + Header: ipfs.BlockModel{ + Data: HeaderIPLD.RawData(), + CID: HeaderIPLD.Cid().String(), }, Transactions: []ipfs.BlockModel{ { diff --git a/pkg/super_node/eth/retriever.go b/pkg/super_node/eth/retriever.go index a360a654..c56dc514 100644 --- a/pkg/super_node/eth/retriever.go +++ b/pkg/super_node/eth/retriever.go @@ -57,7 +57,7 @@ func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) { } // Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters -func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) { +func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) { streamFilter, ok := filter.(*SubscriptionSettings) if !ok { return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &SubscriptionSettings{}, filter) @@ -68,21 +68,26 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe return nil, true, err } - cw := new(CIDWrapper) - cw.BlockNumber = big.NewInt(blockNumber) - // Retrieve cached header CIDs - if !streamFilter.HeaderFilter.Off { - cw.Headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber) - if err != nil { - if err := tx.Rollback(); err != nil { - log.Error(err) - } - log.Error("header cid retrieval error") - return nil, true, err + // Retrieve cached header CIDs at this block height + headers, err := ecr.RetrieveHeaderCIDs(tx, blockNumber) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) } - if streamFilter.HeaderFilter.Uncles { - for _, headerCID := range cw.Headers { - uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.ID) + log.Error("header cid retrieval error") + return nil, true, err + } + cws := make([]shared.CIDsForFetching, len(headers)) + empty := true + for i, header := range headers { + cw := new(CIDWrapper) + cw.BlockNumber = big.NewInt(blockNumber) + if !streamFilter.HeaderFilter.Off { + cw.Header = header + empty = false + if streamFilter.HeaderFilter.Uncles { + // Retrieve uncle cids for this header id + uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, header.ID) if err != nil { if err := tx.Rollback(); err != nil { log.Error(err) @@ -90,66 +95,73 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe log.Error("uncle cid retrieval error") return nil, true, err } - cw.Uncles = append(cw.Uncles, uncleCIDs...) + cw.Uncles = uncleCIDs } } - } - // Retrieve cached trx CIDs - if !streamFilter.TxFilter.Off { - cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, blockNumber) - if err != nil { - if err := tx.Rollback(); err != nil { - log.Error(err) + // Retrieve cached trx CIDs + if !streamFilter.TxFilter.Off { + cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) + } + log.Error("transaction cid retrieval error") + return nil, true, err } - log.Error("transaction cid retrieval error") - return nil, true, err - } - } - trxIds := make([]int64, 0, len(cw.Transactions)) - for _, tx := range cw.Transactions { - trxIds = append(trxIds, tx.ID) - } - // Retrieve cached receipt CIDs - if !streamFilter.ReceiptFilter.Off { - cw.Receipts, err = ecr.RetrieveRctCIDs(tx, streamFilter.ReceiptFilter, blockNumber, nil, trxIds) - if err != nil { - if err := tx.Rollback(); err != nil { - log.Error(err) + if len(cw.Transactions) > 0 { + empty = false } - log.Error("receipt cid retrieval error") - return nil, true, err } - } - // Retrieve cached state CIDs - if !streamFilter.StateFilter.Off { - cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, streamFilter.StateFilter, blockNumber) - if err != nil { - if err := tx.Rollback(); err != nil { - log.Error(err) + trxIds := make([]int64, len(cw.Transactions)) + for j, tx := range cw.Transactions { + trxIds[j] = tx.ID + } + // Retrieve cached receipt CIDs + if !streamFilter.ReceiptFilter.Off { + cw.Receipts, err = ecr.RetrieveRctCIDsByHeaderID(tx, streamFilter.ReceiptFilter, header.ID, trxIds) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) + } + log.Error("receipt cid retrieval error") + return nil, true, err } - log.Error("state cid retrieval error") - return nil, true, err - } - } - // Retrieve cached storage CIDs - if !streamFilter.StorageFilter.Off { - cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, streamFilter.StorageFilter, blockNumber) - if err != nil { - if err := tx.Rollback(); err != nil { - log.Error(err) + if len(cw.Receipts) > 0 { + empty = false } - log.Error("storage cid retrieval error") - return nil, true, err } + // Retrieve cached state CIDs + if !streamFilter.StateFilter.Off { + cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, streamFilter.StateFilter, header.ID) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) + } + log.Error("state cid retrieval error") + return nil, true, err + } + if len(cw.StateNodes) > 0 { + empty = false + } + } + // Retrieve cached storage CIDs + if !streamFilter.StorageFilter.Off { + cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, streamFilter.StorageFilter, header.ID) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) + } + log.Error("storage cid retrieval error") + return nil, true, err + } + if len(cw.StorageNodes) > 0 { + empty = false + } + } + cws[i] = cw } - return cw, empty(cw), tx.Commit() -} -func empty(cidWrapper *CIDWrapper) bool { - if len(cidWrapper.Transactions) > 0 || len(cidWrapper.Headers) > 0 || len(cidWrapper.Uncles) > 0 || len(cidWrapper.Receipts) > 0 || len(cidWrapper.StateNodes) > 0 || len(cidWrapper.StorageNodes) > 0 { - return false - } - return true + return cws, empty, tx.Commit() } // RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight @@ -172,8 +184,8 @@ func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64 // RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters // also returns the ids for the returned transaction cids -func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNumber int64) ([]TxModel, error) { - log.Debug("retrieving transaction cids for block ", blockNumber) +func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) { + log.Debug("retrieving transaction cids for header id ", headerID) args := make([]interface{}, 0, 3) results := make([]TxModel, 0) id := 1 @@ -181,8 +193,8 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNum transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.dst, transaction_cids.src, transaction_cids.index FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) - WHERE header_cids.block_number = $%d`, id) - args = append(args, blockNumber) + WHERE header_cids.id = $%d`, id) + args = append(args, headerID) id++ if len(txFilter.Dst) > 0 { pgStr += fmt.Sprintf(` AND transaction_cids.dst = ANY($%d::VARCHAR(66)[])`, id) @@ -197,18 +209,98 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNum return results, tx.Select(&results, pgStr, args...) } -// RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight that conform to the provided +// RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided // filter parameters and correspond to the provided tx ids -func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) { - log.Debug("retrieving receipt cids for block ", blockNumber) - id := 1 +func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]ReceiptModel, error) { + log.Debug("retrieving receipt cids for header id ", headerID) args := make([]interface{}, 0, 4) + pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, + receipt_cids.contract, receipt_cids.topic0s, receipt_cids.topic1s, + receipt_cids.topic2s, receipt_cids.topic3s + FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids + WHERE receipt_cids.tx_id = transaction_cids.id + AND transaction_cids.header_id = header_cids.id + AND header_cids.id = $1` + id := 2 + args = append(args, headerID) + if len(rctFilter.Contracts) > 0 { + // Filter on contract addresses if there are any + pgStr += fmt.Sprintf(` AND ((receipt_cids.contract = ANY($%d::VARCHAR(66)[])`, id) + args = append(args, pq.Array(rctFilter.Contracts)) + id++ + // Filter on topics if there are any + if hasTopics(rctFilter.Topics) { + pgStr += " AND (" + first := true + for i, topicSet := range rctFilter.Topics { + if i < 4 && len(topicSet) > 0 { + if first { + pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) + first = false + } else { + pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) + } + args = append(args, pq.Array(topicSet)) + id++ + } + } + pgStr += ")" + } + pgStr += ")" + // Filter on txIDs if there are any and we are matching txs + if rctFilter.MatchTxs && len(trxIds) > 0 { + pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id) + args = append(args, pq.Array(trxIds)) + } + pgStr += ")" + } else { // If there are no contract addresses to filter on + // Filter on topics if there are any + if hasTopics(rctFilter.Topics) { + pgStr += " AND ((" + first := true + for i, topicSet := range rctFilter.Topics { + if i < 4 && len(topicSet) > 0 { + if first { + pgStr += fmt.Sprintf(`receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) + first = false + } else { + pgStr += fmt.Sprintf(` AND receipt_cids.topic%ds && $%d::VARCHAR(66)[]`, i, id) + } + args = append(args, pq.Array(topicSet)) + id++ + } + } + pgStr += ")" + // Filter on txIDs if there are any and we are matching txs + if rctFilter.MatchTxs && len(trxIds) > 0 { + pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id) + args = append(args, pq.Array(trxIds)) + } + pgStr += ")" + } else if rctFilter.MatchTxs && len(trxIds) > 0 { + // If there are no contract addresses or topics to filter on, + // Filter on txIDs if there are any and we are matching txs + pgStr += fmt.Sprintf(` AND receipt_cids.tx_id = ANY($%d::INTEGER[])`, id) + args = append(args, pq.Array(trxIds)) + } + } + pgStr += ` ORDER BY transaction_cids.index` + receiptCids := make([]ReceiptModel, 0) + return receiptCids, tx.Select(&receiptCids, pgStr, args...) +} + +// RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided +// filter parameters and correspond to the provided tx ids +func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) { + log.Debug("retrieving receipt cids for block ", blockNumber) + args := make([]interface{}, 0, 5) pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.contract, receipt_cids.topic0s, receipt_cids.topic1s, receipt_cids.topic2s, receipt_cids.topic3s FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids WHERE receipt_cids.tx_id = transaction_cids.id AND transaction_cids.header_id = header_cids.id` + id := 1 if blockNumber > 0 { pgStr += fmt.Sprintf(` AND header_cids.block_number = $%d`, id) args = append(args, blockNumber) @@ -294,15 +386,15 @@ func hasTopics(topics [][]string) bool { return false } -// RetrieveStateCIDs retrieves and returns all of the state node cids at the provided blockheight that conform to the provided filter parameters -func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, blockNumber int64) ([]StateNodeModel, error) { - log.Debug("retrieving state cids for block ", blockNumber) +// RetrieveStateCIDs retrieves and returns all of the state node cids at the provided header ID that conform to the provided filter parameters +func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, headerID int64) ([]StateNodeModel, error) { + log.Debug("retrieving state cids for header id ", headerID) args := make([]interface{}, 0, 2) pgStr := `SELECT state_cids.id, state_cids.header_id, state_cids.state_key, state_cids.leaf, state_cids.cid FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) - WHERE header_cids.block_number = $1` - args = append(args, blockNumber) + WHERE header_cids.id = $1` + args = append(args, headerID) addrLen := len(stateFilter.Addresses) if addrLen > 0 { keys := make([]string, addrLen) @@ -319,18 +411,17 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, return stateNodeCIDs, tx.Select(&stateNodeCIDs, pgStr, args...) } -// RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided blockheight that conform to the provided filter parameters -func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, blockNumber int64) ([]StorageNodeWithStateKeyModel, error) { - log.Debug("retrieving storage cids for block ", blockNumber) +// RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided header id that conform to the provided filter parameters +func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]StorageNodeWithStateKeyModel, error) { + log.Debug("retrieving storage cids for header id ", headerID) args := make([]interface{}, 0, 3) - id := 1 - pgStr := fmt.Sprintf(`SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_key, + pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_key, storage_cids.leaf, storage_cids.cid, state_cids.state_key FROM eth.storage_cids, eth.state_cids, eth.header_cids WHERE storage_cids.state_id = state_cids.id AND state_cids.header_id = header_cids.id - AND header_cids.block_number = $%d`, id) - args = append(args, blockNumber) - id++ + AND header_cids.id = $1` + args = append(args, headerID) + id := 2 addrLen := len(storageFilter.Addresses) if addrLen > 0 { keys := make([]string, addrLen) diff --git a/pkg/super_node/eth/retriever_test.go b/pkg/super_node/eth/retriever_test.go index c713bb8b..45813569 100644 --- a/pkg/super_node/eth/retriever_test.go +++ b/pkg/super_node/eth/retriever_test.go @@ -230,14 +230,14 @@ var _ = Describe("Retriever", func() { cids, empty, err := retriever.Retrieve(openFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) - cidWrapper, ok := cids.(*eth.CIDWrapper) + Expect(len(cids)).To(Equal(1)) + cidWrapper, ok := cids[0].(*eth.CIDWrapper) Expect(ok).To(BeTrue()) Expect(cidWrapper.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(len(cidWrapper.Headers)).To(Equal(1)) - expectedHeaderCIDs := mocks.MockCIDWrapper.Headers - expectedHeaderCIDs[0].ID = cidWrapper.Headers[0].ID - expectedHeaderCIDs[0].NodeID = cidWrapper.Headers[0].NodeID - Expect(cidWrapper.Headers).To(Equal(expectedHeaderCIDs)) + expectedHeaderCID := mocks.MockCIDWrapper.Header + expectedHeaderCID.ID = cidWrapper.Header.ID + expectedHeaderCID.NodeID = cidWrapper.Header.NodeID + Expect(cidWrapper.Header).To(Equal(expectedHeaderCID)) Expect(len(cidWrapper.Transactions)).To(Equal(2)) Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[0].CID)).To(BeTrue()) Expect(eth.TxModelsContainsCID(cidWrapper.Transactions, mocks.MockCIDWrapper.Transactions[1].CID)).To(BeTrue()) @@ -266,10 +266,11 @@ var _ = Describe("Retriever", func() { cids1, empty, err := retriever.Retrieve(rctContractFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) - cidWrapper1, ok := cids1.(*eth.CIDWrapper) + Expect(len(cids1)).To(Equal(1)) + cidWrapper1, ok := cids1[0].(*eth.CIDWrapper) Expect(ok).To(BeTrue()) Expect(cidWrapper1.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(len(cidWrapper1.Headers)).To(Equal(0)) + Expect(cidWrapper1.Header).To(Equal(eth.HeaderModel{})) Expect(len(cidWrapper1.Transactions)).To(Equal(0)) Expect(len(cidWrapper1.StateNodes)).To(Equal(0)) Expect(len(cidWrapper1.StorageNodes)).To(Equal(0)) @@ -282,10 +283,11 @@ var _ = Describe("Retriever", func() { cids2, empty, err := retriever.Retrieve(rctTopicsFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) - cidWrapper2, ok := cids2.(*eth.CIDWrapper) + Expect(len(cids2)).To(Equal(1)) + cidWrapper2, ok := cids2[0].(*eth.CIDWrapper) Expect(ok).To(BeTrue()) Expect(cidWrapper2.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(len(cidWrapper2.Headers)).To(Equal(0)) + Expect(cidWrapper2.Header).To(Equal(eth.HeaderModel{})) Expect(len(cidWrapper2.Transactions)).To(Equal(0)) Expect(len(cidWrapper2.StateNodes)).To(Equal(0)) Expect(len(cidWrapper2.StorageNodes)).To(Equal(0)) @@ -298,10 +300,11 @@ var _ = Describe("Retriever", func() { cids3, empty, err := retriever.Retrieve(rctTopicsAndContractFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) - cidWrapper3, ok := cids3.(*eth.CIDWrapper) + Expect(len(cids3)).To(Equal(1)) + cidWrapper3, ok := cids3[0].(*eth.CIDWrapper) Expect(ok).To(BeTrue()) Expect(cidWrapper3.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(len(cidWrapper3.Headers)).To(Equal(0)) + Expect(cidWrapper3.Header).To(Equal(eth.HeaderModel{})) Expect(len(cidWrapper3.Transactions)).To(Equal(0)) Expect(len(cidWrapper3.StateNodes)).To(Equal(0)) Expect(len(cidWrapper3.StorageNodes)).To(Equal(0)) @@ -314,10 +317,11 @@ var _ = Describe("Retriever", func() { cids4, empty, err := retriever.Retrieve(rctContractsAndTopicFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) - cidWrapper4, ok := cids4.(*eth.CIDWrapper) + Expect(len(cids4)).To(Equal(1)) + cidWrapper4, ok := cids4[0].(*eth.CIDWrapper) Expect(ok).To(BeTrue()) Expect(cidWrapper4.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(len(cidWrapper4.Headers)).To(Equal(0)) + Expect(cidWrapper4.Header).To(Equal(eth.HeaderModel{})) Expect(len(cidWrapper4.Transactions)).To(Equal(0)) Expect(len(cidWrapper4.StateNodes)).To(Equal(0)) Expect(len(cidWrapper4.StorageNodes)).To(Equal(0)) @@ -330,10 +334,11 @@ var _ = Describe("Retriever", func() { cids5, empty, err := retriever.Retrieve(rctsForAllCollectedTrxs, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) - cidWrapper5, ok := cids5.(*eth.CIDWrapper) + Expect(len(cids5)).To(Equal(1)) + cidWrapper5, ok := cids5[0].(*eth.CIDWrapper) Expect(ok).To(BeTrue()) Expect(cidWrapper5.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(len(cidWrapper5.Headers)).To(Equal(0)) + Expect(cidWrapper5.Header).To(Equal(eth.HeaderModel{})) Expect(len(cidWrapper5.Transactions)).To(Equal(2)) Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx1CID.String())).To(BeTrue()) Expect(eth.TxModelsContainsCID(cidWrapper5.Transactions, mocks.Trx2CID.String())).To(BeTrue()) @@ -346,10 +351,11 @@ var _ = Describe("Retriever", func() { cids6, empty, err := retriever.Retrieve(rctsForSelectCollectedTrxs, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) - cidWrapper6, ok := cids6.(*eth.CIDWrapper) + Expect(len(cids6)).To(Equal(1)) + cidWrapper6, ok := cids6[0].(*eth.CIDWrapper) Expect(ok).To(BeTrue()) Expect(cidWrapper6.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(len(cidWrapper6.Headers)).To(Equal(0)) + Expect(cidWrapper6.Header).To(Equal(eth.HeaderModel{})) Expect(len(cidWrapper6.Transactions)).To(Equal(1)) expectedTxCID := mocks.MockCIDWrapper.Transactions[1] expectedTxCID.ID = cidWrapper6.Transactions[0].ID @@ -366,10 +372,11 @@ var _ = Describe("Retriever", func() { cids7, empty, err := retriever.Retrieve(stateFilter, 1) Expect(err).ToNot(HaveOccurred()) Expect(empty).ToNot(BeTrue()) - cidWrapper7, ok := cids7.(*eth.CIDWrapper) + Expect(len(cids7)).To(Equal(1)) + cidWrapper7, ok := cids7[0].(*eth.CIDWrapper) Expect(ok).To(BeTrue()) Expect(cidWrapper7.BlockNumber).To(Equal(mocks.MockCIDWrapper.BlockNumber)) - Expect(len(cidWrapper7.Headers)).To(Equal(0)) + Expect(cidWrapper7.Header).To(Equal(eth.HeaderModel{})) Expect(len(cidWrapper7.Transactions)).To(Equal(0)) Expect(len(cidWrapper7.Receipts)).To(Equal(0)) Expect(len(cidWrapper7.StorageNodes)).To(Equal(0)) diff --git a/pkg/super_node/eth/subscription_config.go b/pkg/super_node/eth/subscription_config.go index f9c77496..a6d563e6 100644 --- a/pkg/super_node/eth/subscription_config.go +++ b/pkg/super_node/eth/subscription_config.go @@ -17,7 +17,6 @@ package eth import ( - "errors" "math/big" "github.com/spf13/viper" @@ -53,7 +52,8 @@ type TxFilter struct { // ReceiptFilter contains filter settings for receipts type ReceiptFilter struct { - Off bool + Off bool + // TODO: change this so that we filter for receipts first and we always return the corresponding transaction MatchTxs bool // turn on to retrieve receipts that pair with retrieved transactions Contracts []string Topics [][]string @@ -70,7 +70,7 @@ type StateFilter struct { type StorageFilter struct { Off bool Addresses []string - StorageKeys []string + StorageKeys []string // need to be the hashs key themselves not slot position IntermediateNodes bool } @@ -96,13 +96,12 @@ func NewEthSubscriptionConfig() (*SubscriptionSettings, error) { Src: viper.GetStringSlice("superNode.ethSubscription.txFilter.src"), Dst: viper.GetStringSlice("superNode.ethSubscription.txFilter.dst"), } - // Below defaults to false and one slice of length 0 - // Which means we get all receipts by default - t := viper.Get("superNode.ethSubscription.receiptFilter.topics") - topics, ok := t.([][]string) - if !ok { - return nil, errors.New("superNode.ethSubscription.receiptFilter.topics needs to be a slice of string slices") - } + // By default all of the topic slices will be empty => match on any/all topics + topics := make([][]string, 4) + topics[0] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic0s") + topics[1] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic1s") + topics[2] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic2s") + topics[3] = viper.GetStringSlice("superNode.ethSubscription.receiptFilter.topic3s") sc.ReceiptFilter = ReceiptFilter{ Off: viper.GetBool("superNode.ethSubscription.receiptFilter.off"), MatchTxs: viper.GetBool("superNode.ethSubscription.receiptFilter.matchTxs"), diff --git a/pkg/super_node/eth/types.go b/pkg/super_node/eth/types.go index 77141e3c..67aac568 100644 --- a/pkg/super_node/eth/types.go +++ b/pkg/super_node/eth/types.go @@ -67,7 +67,7 @@ type CIDPayload struct { // Passed to IPLDFetcher type CIDWrapper struct { BlockNumber *big.Int - Headers []HeaderModel + Header HeaderModel Uncles []UncleModel Transactions []TxModel Receipts []ReceiptModel @@ -79,7 +79,7 @@ type CIDWrapper struct { // Returned by IPLDFetcher and ResponseFilterer type IPLDs struct { BlockNumber *big.Int - Headers []ipfs.BlockModel + Header ipfs.BlockModel Uncles []ipfs.BlockModel Transactions []ipfs.BlockModel Receipts []ipfs.BlockModel diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index 9d78c2b6..cae7e0ac 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -53,6 +53,8 @@ type SuperNode interface { Unsubscribe(id rpc.ID) // Method to access the node info for the service Node() core.Node + // Method to access chain type + Chain() shared.ChainType } // Service is the underlying struct for the super node @@ -363,7 +365,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share log.Debug("historical data ending block:", endingBlock) go func() { for i := startingBlock; i <= endingBlock; i++ { - cidWrapper, empty, err := sap.Retriever.Retrieve(params, i) + cidWrappers, empty, err := sap.Retriever.Retrieve(params, i) if err != nil { sendNonBlockingErr(sub, fmt.Errorf("CID Retrieval error at block %d\r%s", i, err.Error())) continue @@ -371,21 +373,23 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share if empty { continue } - response, err := sap.IPLDFetcher.Fetch(cidWrapper) - if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("IPLD Fetching error at block %d\r%s", i, err.Error())) - continue - } - responseRLP, err := rlp.EncodeToBytes(response) - if err != nil { - log.Error(err) - continue - } - select { - case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: - log.Debugf("sending super node historical data payload to subscription %s", id) - default: - log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id) + for _, cids := range cidWrappers { + response, err := sap.IPLDFetcher.Fetch(cids) + if err != nil { + sendNonBlockingErr(sub, fmt.Errorf("IPLD Fetching error at block %d\r%s", i, err.Error())) + continue + } + responseRLP, err := rlp.EncodeToBytes(response) + if err != nil { + log.Error(err) + continue + } + select { + case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: + log.Debugf("sending super node historical data payload to subscription %s", id) + default: + log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id) + } } } // when we are done backfilling send an empty payload signifying so in the msg @@ -443,6 +447,11 @@ func (sap *Service) Node() core.Node { return sap.NodeInfo } +// Chain returns the chain type for this service +func (sap *Service) Chain() shared.ChainType { + return sap.chain +} + // close is used to close all listening subscriptions // close needs to be called with subscription access locked func (sap *Service) close() { diff --git a/pkg/super_node/shared/intefaces.go b/pkg/super_node/shared/intefaces.go index c6d42134..f62bba76 100644 --- a/pkg/super_node/shared/intefaces.go +++ b/pkg/super_node/shared/intefaces.go @@ -52,7 +52,7 @@ type ResponseFilterer interface { // CIDRetriever retrieves cids according to a provided filter and returns a CID wrapper type CIDRetriever interface { - Retrieve(filter SubscriptionSettings, blockNumber int64) (CIDsForFetching, bool, error) + Retrieve(filter SubscriptionSettings, blockNumber int64) ([]CIDsForFetching, bool, error) RetrieveFirstBlockNumber() (int64, error) RetrieveLastBlockNumber() (int64, error) RetrieveGapsInData() ([]Gap, error) diff --git a/pkg/super_node/shared/mocks/retriever.go b/pkg/super_node/shared/mocks/retriever.go index 02af5049..93efc9a5 100644 --- a/pkg/super_node/shared/mocks/retriever.go +++ b/pkg/super_node/shared/mocks/retriever.go @@ -31,7 +31,7 @@ type CIDRetriever struct { } // RetrieveCIDs mock method -func (*CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) { +func (*CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) { panic("implement me") } diff --git a/pkg/watcher/constructors.go b/pkg/watcher/constructors.go index 3e22bec9..d4e40b8e 100644 --- a/pkg/watcher/constructors.go +++ b/pkg/watcher/constructors.go @@ -23,7 +23,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/postgres" shared2 "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/eth" + "github.com/vulcanize/vulcanizedb/pkg/watcher/eth" "github.com/vulcanize/vulcanizedb/pkg/watcher/shared" ) diff --git a/pkg/watcher/eth/repository.go b/pkg/watcher/eth/repository.go index 0c37f9c0..da2e692b 100644 --- a/pkg/watcher/eth/repository.go +++ b/pkg/watcher/eth/repository.go @@ -19,15 +19,21 @@ package eth import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/super_node" "github.com/vulcanize/vulcanizedb/pkg/watcher/shared" ) +var ( + vacuumThreshold int64 = 5000 // dont know how to decided what this should be set to +) + // Repository is the underlying struct for satisfying the shared.Repository interface for eth type Repository struct { db *postgres.DB triggerFunctions [][2]string + deleteCalls int64 } // NewRepository returns a new eth.Repository that satisfies the shared.Repository interface @@ -35,6 +41,7 @@ func NewRepository(db *postgres.DB, triggerFunctions [][2]string) shared.Reposit return &Repository{ db: db, triggerFunctions: triggerFunctions, + deleteCalls: 0, } } @@ -46,12 +53,42 @@ func (r *Repository) LoadTriggers() error { // QueueData puts super node payload data into the db queue func (r *Repository) QueueData(payload super_node.SubscriptionPayload) error { - panic("implement me") + pgStr := `INSERT INTO eth.queued_data (data, height) VALUES ($1, $2) + ON CONFLICT (height) DO UPDATE SET (data) VALUES ($1)` + _, err := r.db.Exec(pgStr, payload.Data, payload.Height) + return err } -// GetQueueData grabs super node payload data from the db queue -func (r *Repository) GetQueueData(height int64, hash string) (super_node.SubscriptionPayload, error) { - panic("implement me") +// GetQueueData grabs a chunk super node payload data from the queue table so that it can +// be forwarded to the ready table +// this is used to make sure we enter data into the ready table in sequential order +// even if we receive data out-of-order +// it returns the new index +// delete the data it retrieves so as to clear the queue +func (r *Repository) GetQueueData(height int64) (super_node.SubscriptionPayload, int64, error) { + r.deleteCalls++ + pgStr := `DELETE FROM eth.queued_data + WHERE height = $1 + RETURNING *` + var res shared.QueuedData + if err := r.db.Get(&res, pgStr, height); err != nil { + return super_node.SubscriptionPayload{}, height, err + } + payload := super_node.SubscriptionPayload{ + Data: res.Data, + Height: res.Height, + Flag: super_node.EmptyFlag, + } + height++ + // Periodically clean up space in the queue table + if r.deleteCalls >= vacuumThreshold { + _, err := r.db.Exec(`VACUUM ANALYZE eth.queued_data`) + if err != nil { + logrus.Error(err) + } + r.deleteCalls = 0 + } + return payload, height, nil } // ReadyData puts super node payload data in the tables ready for processing by trigger functions diff --git a/pkg/watcher/service.go b/pkg/watcher/service.go index a15b6831..b8f50832 100644 --- a/pkg/watcher/service.go +++ b/pkg/watcher/service.go @@ -19,6 +19,9 @@ package watcher import ( "sync" "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/sirupsen/logrus" @@ -84,7 +87,11 @@ func (s *Service) Init() error { // Watch is the top level loop for watching super node func (s *Service) Watch(wg *sync.WaitGroup) error { - sub, err := s.SuperNodeStreamer.Stream(s.PayloadChan, s.WatcherConfig.SubscriptionConfig) + rlpConfig, err := rlp.EncodeToBytes(s.WatcherConfig.SubscriptionConfig) + if err != nil { + return err + } + sub, err := s.SuperNodeStreamer.Stream(s.PayloadChan, rlpConfig) if err != nil { return err } @@ -108,8 +115,13 @@ func (s *Service) Watch(wg *sync.WaitGroup) error { // combinedQueuing assumes data is not necessarily going to come in linear order // this is true when we are backfilling and streaming at the head or when we are // only streaming at the head since reorgs can occur + +// NOTE: maybe we should push everything to the wait queue, otherwise the index could be shifted as we retrieve data from it func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscription) { wg.Add(1) + // this goroutine is responsible for allocating incoming data to the ready or wait queue + // depending on if it is at the current index or not + forwardQuit := make(chan bool) go func() { for { select { @@ -120,7 +132,8 @@ func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscriptio continue } if payload.Height == atomic.LoadInt64(s.payloadIndex) { - // If the data is at our current index it is ready to be processed; add it to the ready data queue + // If the data is at our current index it is ready to be processed + // add it to the ready data queue and increment the index if err := s.Repository.ReadyData(payload); err != nil { logrus.Error(err) } @@ -134,15 +147,41 @@ func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscriptio logrus.Error(err) case <-s.QuitChan: logrus.Info("Watcher shutting down") + forwardQuit <- true wg.Done() return } } }() + ticker := time.NewTicker(5 * time.Second) + // this goroutine is responsible for moving data from the wait queue to the ready queue + // preserving the correct order and alignment with the current index + go func() { + for { + select { + case <-ticker.C: + // retrieve queued data, in order, and forward it to the ready queue + index := atomic.LoadInt64(s.payloadIndex) + queueData, newIndex, err := s.Repository.GetQueueData(index) + if err != nil { + logrus.Error(err) + continue + } + atomic.StoreInt64(s.payloadIndex, newIndex) + if err := s.Repository.ReadyData(queueData); err != nil { + logrus.Error(err) + } + case <-forwardQuit: + return + default: + // do nothing, wait til next tick + } + } + }() } -// backFillOnlyQueuing assumes the data is coming in contiguously and behind the head -// it puts all data on the ready queue +// backFillOnlyQueuing assumes the data is coming in contiguously from behind the head +// it puts all data directly into the ready queue // it continues until the watcher is told to quit or we receive notification that the backfill is finished func (s *Service) backFillOnlyQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscription) { wg.Add(1) diff --git a/pkg/watcher/shared/interfaces.go b/pkg/watcher/shared/interfaces.go index 871d6fab..5e6c64ee 100644 --- a/pkg/watcher/shared/interfaces.go +++ b/pkg/watcher/shared/interfaces.go @@ -20,18 +20,17 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/vulcanize/vulcanizedb/pkg/super_node" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // Repository is the interface for the Postgres database type Repository interface { LoadTriggers() error QueueData(payload super_node.SubscriptionPayload) error - GetQueueData(height int64, hash string) (super_node.SubscriptionPayload, error) + GetQueueData(height int64) (super_node.SubscriptionPayload, int64, error) ReadyData(payload super_node.SubscriptionPayload) error } // SuperNodeStreamer is the interface for streaming data from a vulcanizeDB super node type SuperNodeStreamer interface { - Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error) + Stream(payloadChan chan super_node.SubscriptionPayload, rlpParams []byte) (*rpc.ClientSubscription, error) } diff --git a/pkg/watcher/shared/models.go b/pkg/watcher/shared/models.go new file mode 100644 index 00000000..e905a7e2 --- /dev/null +++ b/pkg/watcher/shared/models.go @@ -0,0 +1,24 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package shared + +// QueuedData is the db model for queued data +type QueuedData struct { + ID int64 `db:"id"` + Data []byte `db:"data"` + Height int64 `db:"height"` +}