diff --git a/documentation/super-node.md b/documentation/super-node.md index 0b42521c..1fad5903 100644 --- a/documentation/super-node.md +++ b/documentation/super-node.md @@ -328,9 +328,12 @@ not send any transactions to the subscriber; `src` and `dst` are string arrays w if they have any addresses then the super-node will only send transactions that were sent or received by the addresses contained in `src` and `dst`, respectively. -`subscription.receiptFilter` has two sub-options: `off` and `topics`. Setting `off` to true tells the super-node to +`subscription.receiptFilter` has four sub-options: `off`, `topics`, `contracts` and `matchTxs`. Setting `off` to true tells the super-node to not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for, -if it has any topics then the super-node will only send receipts that contain logs which have that topic0. +if it has any topics then the super-node will only send receipts that contain logs which have that topic0. Similarly, `contracts` is +a string array which can be filled with contract addresses we want to filter for, if it contains any contract addresses the super-node will +only send receipts that correspond to one of those contracts. `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for +transactions will be sent by the super-node, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters. `subscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the super-node to not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for, diff --git a/pkg/config/subscription.go b/pkg/config/subscription.go index 29d87456..53e83ca3 100644 --- a/pkg/config/subscription.go +++ b/pkg/config/subscription.go @@ -44,6 +44,7 @@ type TrxFilter struct { type ReceiptFilter struct { Off bool + MatchTxs bool // turn on to retrieve receipts that pair with retrieved transactions Contracts []string Topic0s []string } diff --git a/pkg/ipfs/fetcher.go b/pkg/ipfs/fetcher.go index 1b7e893f..84f74ccb 100644 --- a/pkg/ipfs/fetcher.go +++ b/pkg/ipfs/fetcher.go @@ -18,6 +18,7 @@ package ipfs import ( "context" + "errors" "github.com/ethereum/go-ethereum/common" "github.com/ipfs/go-block-format" @@ -26,9 +27,19 @@ import ( log "github.com/sirupsen/logrus" ) +var ( + errUnexpectedNumberOfIPLDs = errors.New("ipfs batch fetch returned unexpected number of IPLDs") +) + // IPLDFetcher is an interface for fetching IPLDs type IPLDFetcher interface { FetchIPLDs(cids CIDWrapper) (*IPLDWrapper, error) + FetchHeaders(cids []string) ([]blocks.Block, error) + FetchUncles(cids []string) ([]blocks.Block, error) + FetchTrxs(cids []string) ([]blocks.Block, error) + FetchRcts(cids []string) ([]blocks.Block, error) + FetchState(cids []StateNodeCID) (map[common.Hash]blocks.Block, error) + FetchStorage(cids []StorageNodeCID) (map[common.Hash]map[common.Hash]blocks.Block, error) } // EthIPLDFetcher is used to fetch ETH IPLD objects from IPFS @@ -51,165 +62,163 @@ func NewIPLDFetcher(ipfsPath string) (*EthIPLDFetcher, error) { func (f *EthIPLDFetcher) FetchIPLDs(cids CIDWrapper) (*IPLDWrapper, error) { log.Debug("fetching iplds") - blocks := &IPLDWrapper{ - BlockNumber: cids.BlockNumber, - Headers: make([]blocks.Block, 0), - Uncles: make([]blocks.Block, 0), - Transactions: make([]blocks.Block, 0), - Receipts: make([]blocks.Block, 0), - StateNodes: make(map[common.Hash]blocks.Block), - StorageNodes: make(map[common.Hash]map[common.Hash]blocks.Block), + iplds := new(IPLDWrapper) + iplds.BlockNumber = cids.BlockNumber + var err error + iplds.Headers, err = f.FetchHeaders(cids.Headers) + if err != nil { + return nil, err } - - headersErr := f.fetchHeaders(cids, blocks) - if headersErr != nil { - return nil, headersErr + iplds.Uncles, err = f.FetchUncles(cids.Uncles) + if err != nil { + return nil, err } - unclesErr := f.fetchUncles(cids, blocks) - if unclesErr != nil { - return nil, unclesErr + iplds.Transactions, err = f.FetchTrxs(cids.Transactions) + if err != nil { + return nil, err } - trxsErr := f.fetchTrxs(cids, blocks) - if trxsErr != nil { - return nil, trxsErr + iplds.Receipts, err = f.FetchRcts(cids.Receipts) + if err != nil { + return nil, err } - rctsErr := f.fetchRcts(cids, blocks) - if rctsErr != nil { - return nil, rctsErr + iplds.StateNodes, err = f.FetchState(cids.StateNodes) + if err != nil { + return nil, err } - storageErr := f.fetchStorage(cids, blocks) - if storageErr != nil { - return nil, storageErr + iplds.StorageNodes, err = f.FetchStorage(cids.StorageNodes) + if err != nil { + return nil, err } - stateErr := f.fetchState(cids, blocks) - if stateErr != nil { - return nil, stateErr - } - - return blocks, nil + return iplds, nil } -// fetchHeaders fetches headers +// FetchHeaders fetches headers // It uses the f.fetchBatch method -func (f *EthIPLDFetcher) fetchHeaders(cids CIDWrapper, blocks *IPLDWrapper) error { +func (f *EthIPLDFetcher) FetchHeaders(cids []string) ([]blocks.Block, error) { log.Debug("fetching header iplds") - headerCids := make([]cid.Cid, 0, len(cids.Headers)) - for _, c := range cids.Headers { + headerCids := make([]cid.Cid, 0, len(cids)) + for _, c := range cids { dc, err := cid.Decode(c) if err != nil { - return err + return nil, err } headerCids = append(headerCids, dc) } - blocks.Headers = f.fetchBatch(headerCids) - if len(blocks.Headers) != len(headerCids) { - log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(blocks.Headers), len(headerCids)) + headers := f.fetchBatch(headerCids) + if len(headers) != len(headerCids) { + log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids)) + return headers, errUnexpectedNumberOfIPLDs } - return nil + return headers, nil } -// fetchUncles fetches uncles +// FetchUncles fetches uncles // It uses the f.fetchBatch method -func (f *EthIPLDFetcher) fetchUncles(cids CIDWrapper, blocks *IPLDWrapper) error { +func (f *EthIPLDFetcher) FetchUncles(cids []string) ([]blocks.Block, error) { log.Debug("fetching uncle iplds") - uncleCids := make([]cid.Cid, 0, len(cids.Uncles)) - for _, c := range cids.Uncles { + uncleCids := make([]cid.Cid, 0, len(cids)) + for _, c := range cids { dc, err := cid.Decode(c) if err != nil { - return err + return nil, err } uncleCids = append(uncleCids, dc) } - blocks.Uncles = f.fetchBatch(uncleCids) - if len(blocks.Uncles) != len(uncleCids) { - log.Errorf("ipfs fetcher: number of uncle blocks returned (%d) does not match number expected (%d)", len(blocks.Uncles), len(uncleCids)) + uncles := f.fetchBatch(uncleCids) + if len(uncles) != len(uncleCids) { + log.Errorf("ipfs fetcher: number of uncle blocks returned (%d) does not match number expected (%d)", len(uncles), len(uncleCids)) + return uncles, errUnexpectedNumberOfIPLDs } - return nil + return uncles, nil } -// fetchTrxs fetches transactions +// FetchTrxs fetches transactions // It uses the f.fetchBatch method -func (f *EthIPLDFetcher) fetchTrxs(cids CIDWrapper, blocks *IPLDWrapper) error { +func (f *EthIPLDFetcher) FetchTrxs(cids []string) ([]blocks.Block, error) { log.Debug("fetching transaction iplds") - trxCids := make([]cid.Cid, 0, len(cids.Transactions)) - for _, c := range cids.Transactions { + trxCids := make([]cid.Cid, 0, len(cids)) + for _, c := range cids { dc, err := cid.Decode(c) if err != nil { - return err + return nil, err } trxCids = append(trxCids, dc) } - blocks.Transactions = f.fetchBatch(trxCids) - if len(blocks.Transactions) != len(trxCids) { - log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(blocks.Transactions), len(trxCids)) + trxs := f.fetchBatch(trxCids) + if len(trxs) != len(trxCids) { + log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids)) + return trxs, errUnexpectedNumberOfIPLDs } - return nil + return trxs, nil } -// fetchRcts fetches receipts +// FetchRcts fetches receipts // It uses the f.fetchBatch method -func (f *EthIPLDFetcher) fetchRcts(cids CIDWrapper, blocks *IPLDWrapper) error { +func (f *EthIPLDFetcher) FetchRcts(cids []string) ([]blocks.Block, error) { log.Debug("fetching receipt iplds") - rctCids := make([]cid.Cid, 0, len(cids.Receipts)) - for _, c := range cids.Receipts { + rctCids := make([]cid.Cid, 0, len(cids)) + for _, c := range cids { dc, err := cid.Decode(c) if err != nil { - return err + return nil, err } rctCids = append(rctCids, dc) } - blocks.Receipts = f.fetchBatch(rctCids) - if len(blocks.Receipts) != len(rctCids) { - log.Errorf("ipfs fetcher: number of receipt blocks returned (%d) does not match number expected (%d)", len(blocks.Receipts), len(rctCids)) + rcts := f.fetchBatch(rctCids) + if len(rcts) != len(rctCids) { + log.Errorf("ipfs fetcher: number of receipt blocks returned (%d) does not match number expected (%d)", len(rcts), len(rctCids)) + return rcts, errUnexpectedNumberOfIPLDs } - return nil + return rcts, nil } -// fetchState fetches state nodes +// FetchState fetches state nodes // It uses the single f.fetch method instead of the batch fetch, because it // needs to maintain the data's relation to state keys -func (f *EthIPLDFetcher) fetchState(cids CIDWrapper, blocks *IPLDWrapper) error { +func (f *EthIPLDFetcher) FetchState(cids []StateNodeCID) (map[common.Hash]blocks.Block, error) { log.Debug("fetching state iplds") - for _, stateNode := range cids.StateNodes { + stateNodes := make(map[common.Hash]blocks.Block) + for _, stateNode := range cids { if stateNode.CID == "" || stateNode.Key == "" { continue } - dc, decodeErr := cid.Decode(stateNode.CID) - if decodeErr != nil { - return decodeErr + dc, err := cid.Decode(stateNode.CID) + if err != nil { + return nil, err } - block, fetchErr := f.fetch(dc) - if fetchErr != nil { - return fetchErr + state, err := f.fetch(dc) + if err != nil { + return nil, err } - blocks.StateNodes[common.HexToHash(stateNode.Key)] = block + stateNodes[common.HexToHash(stateNode.Key)] = state } - return nil + return stateNodes, nil } -// fetchStorage fetches storage nodes +// FetchStorage fetches storage nodes // It uses the single f.fetch method instead of the batch fetch, because it // needs to maintain the data's relation to state and storage keys -func (f *EthIPLDFetcher) fetchStorage(cids CIDWrapper, blks *IPLDWrapper) error { +func (f *EthIPLDFetcher) FetchStorage(cids []StorageNodeCID) (map[common.Hash]map[common.Hash]blocks.Block, error) { log.Debug("fetching storage iplds") - for _, storageNode := range cids.StorageNodes { + storageNodes := make(map[common.Hash]map[common.Hash]blocks.Block) + for _, storageNode := range cids { if storageNode.CID == "" || storageNode.Key == "" || storageNode.StateKey == "" { continue } - dc, decodeErr := cid.Decode(storageNode.CID) - if decodeErr != nil { - return decodeErr + dc, err := cid.Decode(storageNode.CID) + if err != nil { + return nil, err } - blk, fetchErr := f.fetch(dc) - if fetchErr != nil { - return fetchErr + storage, err := f.fetch(dc) + if err != nil { + return nil, err } - if blks.StorageNodes[common.HexToHash(storageNode.StateKey)] == nil { - blks.StorageNodes[common.HexToHash(storageNode.StateKey)] = make(map[common.Hash]blocks.Block) + if storageNodes[common.HexToHash(storageNode.StateKey)] == nil { + storageNodes[common.HexToHash(storageNode.StateKey)] = make(map[common.Hash]blocks.Block) } - blks.StorageNodes[common.HexToHash(storageNode.StateKey)][common.HexToHash(storageNode.Key)] = blk + storageNodes[common.HexToHash(storageNode.StateKey)][common.HexToHash(storageNode.Key)] = storage } - return nil + return storageNodes, nil } // fetch is used to fetch a single cid diff --git a/pkg/ipfs/mocks/test_data.go b/pkg/ipfs/mocks/test_data.go index 2f6c25ff..1f696938 100644 --- a/pkg/ipfs/mocks/test_data.go +++ b/pkg/ipfs/mocks/test_data.go @@ -312,6 +312,7 @@ var ( MockSeeNodePayload = streamer.SuperNodePayload{ BlockNumber: big.NewInt(1), HeadersRlp: [][]byte{MockHeaderRlp}, + UnclesRlp: [][]byte{}, TransactionsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)}, ReceiptsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)}, StateNodesRlp: map[common.Hash][]byte{ diff --git a/pkg/ipfs/resolver.go b/pkg/ipfs/resolver.go index 6ee25af6..86e6ac40 100644 --- a/pkg/ipfs/resolver.go +++ b/pkg/ipfs/resolver.go @@ -25,6 +25,12 @@ import ( // IPLDResolver is the interface to resolving IPLDs type IPLDResolver interface { ResolveIPLDs(ipfsBlocks IPLDWrapper) streamer.SuperNodePayload + ResolveHeaders(iplds []blocks.Block) [][]byte + ResolveUncles(iplds []blocks.Block) [][]byte + ResolveTransactions(iplds []blocks.Block) [][]byte + ResolveReceipts(blocks []blocks.Block) [][]byte + ResolveState(iplds map[common.Hash]blocks.Block) map[common.Hash][]byte + ResolveStorage(iplds map[common.Hash]map[common.Hash]blocks.Block) map[common.Hash]map[common.Hash][]byte } // EthIPLDResolver is the underlying struct to support the IPLDResolver interface @@ -37,61 +43,64 @@ func NewIPLDResolver() *EthIPLDResolver { // ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) streamer.SuperNodePayload { - response := &streamer.SuperNodePayload{ + return streamer.SuperNodePayload{ BlockNumber: ipfsBlocks.BlockNumber, - StateNodesRlp: make(map[common.Hash][]byte), - StorageNodesRlp: make(map[common.Hash]map[common.Hash][]byte), - } - eir.resolveHeaders(ipfsBlocks.Headers, response) - eir.resolveUncles(ipfsBlocks.Uncles, response) - eir.resolveTransactions(ipfsBlocks.Transactions, response) - eir.resolveReceipts(ipfsBlocks.Receipts, response) - eir.resolveState(ipfsBlocks.StateNodes, response) - eir.resolveStorage(ipfsBlocks.StorageNodes, response) - return *response -} - -func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *streamer.SuperNodePayload) { - for _, block := range blocks { - raw := block.RawData() - response.HeadersRlp = append(response.HeadersRlp, raw) + HeadersRlp: eir.ResolveHeaders(ipfsBlocks.Headers), + UnclesRlp: eir.ResolveUncles(ipfsBlocks.Uncles), + TransactionsRlp: eir.ResolveTransactions(ipfsBlocks.Transactions), + ReceiptsRlp: eir.ResolveReceipts(ipfsBlocks.Receipts), + StateNodesRlp: eir.ResolveState(ipfsBlocks.StateNodes), + StorageNodesRlp: eir.ResolveStorage(ipfsBlocks.StorageNodes), } } -func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *streamer.SuperNodePayload) { - for _, block := range blocks { - raw := block.RawData() - response.UnclesRlp = append(response.UnclesRlp, raw) +func (eir *EthIPLDResolver) ResolveHeaders(iplds []blocks.Block) [][]byte { + headerRlps := make([][]byte, 0, len(iplds)) + for _, ipld := range iplds { + headerRlps = append(headerRlps, ipld.RawData()) } + return headerRlps } -func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *streamer.SuperNodePayload) { - for _, block := range blocks { - raw := block.RawData() - response.TransactionsRlp = append(response.TransactionsRlp, raw) +func (eir *EthIPLDResolver) ResolveUncles(iplds []blocks.Block) [][]byte { + uncleRlps := make([][]byte, 0, len(iplds)) + for _, ipld := range iplds { + uncleRlps = append(uncleRlps, ipld.RawData()) } + return uncleRlps } -func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *streamer.SuperNodePayload) { - for _, block := range blocks { - raw := block.RawData() - response.ReceiptsRlp = append(response.ReceiptsRlp, raw) +func (eir *EthIPLDResolver) ResolveTransactions(iplds []blocks.Block) [][]byte { + trxs := make([][]byte, 0, len(iplds)) + for _, ipld := range iplds { + trxs = append(trxs, ipld.RawData()) } + return trxs } -func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *streamer.SuperNodePayload) { - for key, block := range blocks { - raw := block.RawData() - response.StateNodesRlp[key] = raw +func (eir *EthIPLDResolver) ResolveReceipts(iplds []blocks.Block) [][]byte { + rcts := make([][]byte, 0, len(iplds)) + for _, ipld := range iplds { + rcts = append(rcts, ipld.RawData()) } + return rcts } -func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *streamer.SuperNodePayload) { - for stateKey, storageBlocks := range blocks { - response.StorageNodesRlp[stateKey] = make(map[common.Hash][]byte) - for storageKey, storageVal := range storageBlocks { - raw := storageVal.RawData() - response.StorageNodesRlp[stateKey][storageKey] = raw +func (eir *EthIPLDResolver) ResolveState(iplds map[common.Hash]blocks.Block) map[common.Hash][]byte { + stateNodes := make(map[common.Hash][]byte, len(iplds)) + for key, ipld := range iplds { + stateNodes[key] = ipld.RawData() + } + return stateNodes +} + +func (eir *EthIPLDResolver) ResolveStorage(iplds map[common.Hash]map[common.Hash]blocks.Block) map[common.Hash]map[common.Hash][]byte { + storageNodes := make(map[common.Hash]map[common.Hash][]byte) + for stateKey, storageIPLDs := range iplds { + storageNodes[stateKey] = make(map[common.Hash][]byte) + for storageKey, storageVal := range storageIPLDs { + storageNodes[stateKey][storageKey] = storageVal.RawData() } } + return storageNodes } diff --git a/pkg/super_node/filterer.go b/pkg/super_node/filterer.go index 58bf1be8..d99179ca 100644 --- a/pkg/super_node/filterer.go +++ b/pkg/super_node/filterer.go @@ -43,35 +43,35 @@ func NewResponseFilterer() *Filterer { // FilterResponse is used to filter through eth data to extract and package requested data into a Payload func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SuperNodePayload, error) { - response := new(streamer.SuperNodePayload) - headersErr := s.filterHeaders(streamFilters, response, payload) - if headersErr != nil { - return streamer.SuperNodePayload{}, headersErr + if checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { + response := new(streamer.SuperNodePayload) + if err := s.filterHeaders(streamFilters.HeaderFilter, response, payload); err != nil { + return streamer.SuperNodePayload{}, err + } + txHashes, err := s.filterTransactions(streamFilters.TrxFilter, response, payload) + if err != nil { + return streamer.SuperNodePayload{}, err + } + if err := s.filerReceipts(streamFilters.ReceiptFilter, response, payload, txHashes); err != nil { + return streamer.SuperNodePayload{}, err + } + if err := s.filterState(streamFilters.StateFilter, response, payload); err != nil { + return streamer.SuperNodePayload{}, err + } + if err := s.filterStorage(streamFilters.StorageFilter, response, payload); err != nil { + return streamer.SuperNodePayload{}, err + } + response.BlockNumber = payload.BlockNumber + return *response, nil } - txHashes, trxsErr := s.filterTransactions(streamFilters, response, payload) - if trxsErr != nil { - return streamer.SuperNodePayload{}, trxsErr - } - rctsErr := s.filerReceipts(streamFilters, response, payload, txHashes) - if rctsErr != nil { - return streamer.SuperNodePayload{}, rctsErr - } - stateErr := s.filterState(streamFilters, response, payload) - if stateErr != nil { - return streamer.SuperNodePayload{}, stateErr - } - storageErr := s.filterStorage(streamFilters, response, payload) - if storageErr != nil { - return streamer.SuperNodePayload{}, storageErr - } - response.BlockNumber = payload.BlockNumber - return *response, nil + return streamer.SuperNodePayload{}, nil } -func (s *Filterer) filterHeaders(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { - if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { +func (s *Filterer) filterHeaders(headerFilter config.HeaderFilter, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { + if !headerFilter.Off { response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP) - if streamFilters.HeaderFilter.Uncles { + if headerFilter.Uncles { + response.UnclesRlp = make([][]byte, 0, len(payload.BlockBody.Uncles)) for _, uncle := range payload.BlockBody.Uncles { uncleRlp, err := rlp.EncodeToBytes(uncle) if err != nil { @@ -91,11 +91,11 @@ func checkRange(start, end, actual int64) bool { return false } -func (s *Filterer) filterTransactions(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) ([]common.Hash, error) { +func (s *Filterer) filterTransactions(trxFilter config.TrxFilter, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) ([]common.Hash, error) { trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions)) - if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { + if !trxFilter.Off { for i, trx := range payload.BlockBody.Transactions { - if checkTransactions(streamFilters.TrxFilter.Src, streamFilters.TrxFilter.Dst, payload.TrxMetaData[i].Src, payload.TrxMetaData[i].Dst) { + if checkTransactions(trxFilter.Src, trxFilter.Dst, payload.TrxMetaData[i].Src, payload.TrxMetaData[i].Dst) { trxBuffer := new(bytes.Buffer) err := trx.EncodeRLP(trxBuffer) if err != nil { @@ -127,10 +127,10 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin return false } -func (s *Filterer) filerReceipts(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload, trxHashes []common.Hash) error { - if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { +func (s *Filterer) filerReceipts(receiptFilter config.ReceiptFilter, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload, trxHashes []common.Hash) error { + if !receiptFilter.Off { for i, receipt := range payload.Receipts { - if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, streamFilters.ReceiptFilter.Contracts, payload.ReceiptMetaData[i].ContractAddress, trxHashes) { + if checkReceipts(receipt, receiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, receiptFilter.Contracts, payload.ReceiptMetaData[i].ContractAddress, trxHashes, receiptFilter.MatchTxs) { receiptForStorage := (*types.ReceiptForStorage)(receipt) receiptBuffer := new(bytes.Buffer) err := receiptForStorage.EncodeRLP(receiptBuffer) @@ -144,15 +144,17 @@ func (s *Filterer) filerReceipts(streamFilters config.Subscription, response *st return nil } -func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContracts []string, actualContract string, wantedTrxHashes []common.Hash) bool { - // If we aren't filtering for any topics or contracts, all topics are a go - if len(wantedTopics) == 0 && len(wantedContracts) == 0 { +func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContracts []string, actualContract string, wantedTrxHashes []common.Hash, matchTxs bool) bool { + // If we aren't filtering for any topics, contracts, or corresponding trxs then all receipts are a go + if len(wantedTopics) == 0 && len(wantedContracts) == 0 && (len(wantedTrxHashes) == 0 || !matchTxs) { return true } - // No matter what filters we have, we keep receipts for the trxs we are interested in - for _, wantedTrxHash := range wantedTrxHashes { - if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) { - return true + // No matter what filters we have, we keep receipts for specific trxs we are interested in + if matchTxs { + for _, wantedTrxHash := range wantedTrxHashes { + if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) { + return true + } } } @@ -165,7 +167,7 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContrac } } } - } else { // We keep receipts that belong to one of the specified contracts and have logs with topics if we aren't filtering on topics + } else { // We keep all receipts that belong to one of the specified contracts if we aren't filtering on topics for _, wantedContract := range wantedContracts { if wantedContract == actualContract { if len(wantedTopics) == 0 { @@ -186,17 +188,17 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContrac return false } -func (s *Filterer) filterState(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { - if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { +func (s *Filterer) filterState(stateFilter config.StateFilter, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { + if !stateFilter.Off { response.StateNodesRlp = make(map[common.Hash][]byte) - keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses)) - for _, addr := range streamFilters.StateFilter.Addresses { + keyFilters := make([]common.Hash, 0, len(stateFilter.Addresses)) + for _, addr := range stateFilter.Addresses { keyFilter := ipfs.AddressToKey(common.HexToAddress(addr)) keyFilters = append(keyFilters, keyFilter) } for key, stateNode := range payload.StateNodes { if checkNodeKeys(keyFilters, key) { - if stateNode.Leaf || streamFilters.StateFilter.IntermediateNodes { + if stateNode.Leaf || stateFilter.IntermediateNodes { response.StateNodesRlp[key] = stateNode.Value } } @@ -218,16 +220,16 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool { return false } -func (s *Filterer) filterStorage(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { - if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { +func (s *Filterer) filterStorage(storageFilter config.StorageFilter, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { + if !storageFilter.Off { response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) - stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses)) - for _, addr := range streamFilters.StorageFilter.Addresses { + stateKeyFilters := make([]common.Hash, 0, len(storageFilter.Addresses)) + for _, addr := range storageFilter.Addresses { keyFilter := ipfs.AddressToKey(common.HexToAddress(addr)) stateKeyFilters = append(stateKeyFilters, keyFilter) } - storageKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.StorageKeys)) - for _, store := range streamFilters.StorageFilter.StorageKeys { + storageKeyFilters := make([]common.Hash, 0, len(storageFilter.StorageKeys)) + for _, store := range storageFilter.StorageKeys { keyFilter := ipfs.HexToKey(store) storageKeyFilters = append(storageKeyFilters, keyFilter) } diff --git a/pkg/super_node/filterer_test.go b/pkg/super_node/filterer_test.go index a1c0fabd..b24b900c 100644 --- a/pkg/super_node/filterer_test.go +++ b/pkg/super_node/filterer_test.go @@ -46,7 +46,8 @@ var _ = Describe("Filterer", func() { Expect(err).ToNot(HaveOccurred()) Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp)) - Expect(superNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp)) + var unclesRlp [][]byte + Expect(superNodePayload.UnclesRlp).To(Equal(unclesRlp)) Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2)) Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) diff --git a/pkg/super_node/mocks/retriever.go b/pkg/super_node/mocks/retriever.go index f2ca3de9..57258b37 100644 --- a/pkg/super_node/mocks/retriever.go +++ b/pkg/super_node/mocks/retriever.go @@ -1,7 +1,9 @@ package mocks import ( + "github.com/jmoiron/sqlx" "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/ipfs" ) @@ -19,6 +21,36 @@ func (*MockCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNu panic("implement me") } +// RetrieveHeaderCIDs mock method +func (*MockCIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]string, error) { + panic("implement me") + +} + +// RetrieveUncleCIDs mock method +func (*MockCIDRetriever) RetrieveUncleCIDs(tx *sqlx.Tx, blockNumber int64) ([]string, error) { + panic("implement me") + +} + +// RetrieveTrxCIDs mock method +func (*MockCIDRetriever) RetrieveTrxCIDs(tx *sqlx.Tx, txFilter config.TrxFilter, blockNumber int64) ([]string, []int64, error) { + panic("implement me") + +} + +// RetrieveRctCIDs mock method +func (*MockCIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter config.ReceiptFilter, blockNumber int64, trxIds []int64) ([]string, error) { + panic("implement me") + +} + +// RetrieveStateCIDs mock method +func (*MockCIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter config.StateFilter, blockNumber int64) ([]ipfs.StateNodeCID, error) { + panic("implement me") + +} + // RetrieveLastBlockNumber mock method func (*MockCIDRetriever) RetrieveLastBlockNumber() (int64, error) { panic("implement me") @@ -42,3 +74,7 @@ func (mcr *MockCIDRetriever) SetGapsToRetrieve(gaps [][2]uint64) { } mcr.GapsToRetrieve = append(mcr.GapsToRetrieve, gaps...) } + +func (mcr *MockCIDRetriever) Database() *postgres.DB { + panic("implement me") +} diff --git a/pkg/super_node/retriever.go b/pkg/super_node/retriever.go index 3a9e495c..7173b946 100644 --- a/pkg/super_node/retriever.go +++ b/pkg/super_node/retriever.go @@ -34,6 +34,12 @@ type CIDRetriever interface { RetrieveLastBlockNumber() (int64, error) RetrieveFirstBlockNumber() (int64, error) RetrieveGapsInData() ([][2]uint64, error) + RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]string, error) + RetrieveUncleCIDs(tx *sqlx.Tx, blockNumber int64) ([]string, error) + RetrieveTrxCIDs(tx *sqlx.Tx, txFilter config.TrxFilter, blockNumber int64) ([]string, []int64, error) + RetrieveRctCIDs(tx *sqlx.Tx, rctFilter config.ReceiptFilter, blockNumber int64, trxIds []int64) ([]string, error) + RetrieveStateCIDs(tx *sqlx.Tx, stateFilter config.StateFilter, blockNumber int64) ([]ipfs.StateNodeCID, error) + Database() *postgres.DB } // EthCIDRetriever is the underlying struct supporting the CIDRetriever interface @@ -65,37 +71,32 @@ func (ecr *EthCIDRetriever) RetrieveLastBlockNumber() (int64, error) { // RetrieveCIDs is used to retrieve all of the CIDs which conform to the passed StreamFilters func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) { log.Debug("retrieving cids") - tx, beginErr := ecr.db.Beginx() - if beginErr != nil { - return nil, beginErr + tx, err := ecr.db.Beginx() + if err != nil { + return nil, err } - // THIS IS SUPER EXPENSIVE HAVING TO CYCLE THROUGH EACH BLOCK, NEED BETTER WAY TO FETCH CIDS - // WHILE STILL MAINTAINING RELATION INFO ABOUT WHAT BLOCK THE CIDS BELONG TO + cw := new(ipfs.CIDWrapper) cw.BlockNumber = big.NewInt(blockNumber) // Retrieve cached header CIDs if !streamFilters.HeaderFilter.Off { - var headersErr error - cw.Headers, headersErr = ecr.retrieveHeaderCIDs(tx, streamFilters, blockNumber) - if headersErr != nil { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - log.Error(rollbackErr) + cw.Headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) } log.Error("header cid retrieval error") - return nil, headersErr + return nil, err } if streamFilters.HeaderFilter.Uncles { - var unclesErr error - cw.Uncles, unclesErr = ecr.retrieveUncleCIDs(tx, streamFilters, blockNumber) - if unclesErr != nil { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - log.Error(rollbackErr) + cw.Uncles, err = ecr.RetrieveUncleCIDs(tx, blockNumber) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) } log.Error("uncle cid retrieval error") - return nil, unclesErr + return nil, err } } } @@ -103,64 +104,58 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, bloc // Retrieve cached trx CIDs var trxIds []int64 if !streamFilters.TrxFilter.Off { - var trxsErr error - cw.Transactions, trxIds, trxsErr = ecr.retrieveTrxCIDs(tx, streamFilters, blockNumber) - if trxsErr != nil { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - log.Error(rollbackErr) + cw.Transactions, trxIds, err = ecr.RetrieveTrxCIDs(tx, streamFilters.TrxFilter, blockNumber) + if err != nil { + err := tx.Rollback() + if err != nil { + log.Error(err) } log.Error("transaction cid retrieval error") - return nil, trxsErr + return nil, err } } // Retrieve cached receipt CIDs if !streamFilters.ReceiptFilter.Off { - var rctsErr error - cw.Receipts, rctsErr = ecr.retrieveRctCIDs(tx, streamFilters, blockNumber, trxIds) - if rctsErr != nil { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - log.Error(rollbackErr) + cw.Receipts, err = ecr.RetrieveRctCIDs(tx, streamFilters.ReceiptFilter, blockNumber, trxIds) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) } log.Error("receipt cid retrieval error") - return nil, rctsErr + return nil, err } } // Retrieve cached state CIDs if !streamFilters.StateFilter.Off { - var stateErr error - cw.StateNodes, stateErr = ecr.retrieveStateCIDs(tx, streamFilters, blockNumber) - if stateErr != nil { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - log.Error(rollbackErr) + cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, streamFilters.StateFilter, blockNumber) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) } log.Error("state cid retrieval error") - return nil, stateErr + return nil, err } } // Retrieve cached storage CIDs if !streamFilters.StorageFilter.Off { - var storageErr error - cw.StorageNodes, storageErr = ecr.retrieveStorageCIDs(tx, streamFilters, blockNumber) - if storageErr != nil { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - log.Error(rollbackErr) + cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, streamFilters.StorageFilter, blockNumber) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) } log.Error("storage cid retrieval error") - return nil, storageErr + return nil, err } } return cw, tx.Commit() } -func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]string, error) { +// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight +func (ecr *EthCIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]string, error) { log.Debug("retrieving header cids for block ", blockNumber) headers := make([]string, 0) pgStr := `SELECT cid FROM header_cids @@ -169,7 +164,8 @@ func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config return headers, err } -func (ecr *EthCIDRetriever) retrieveUncleCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]string, error) { +// RetrieveUncleCIDs retrieves and returns all of the uncle cids at the provided blockheight +func (ecr *EthCIDRetriever) RetrieveUncleCIDs(tx *sqlx.Tx, blockNumber int64) ([]string, error) { log.Debug("retrieving header cids for block ", blockNumber) headers := make([]string, 0) pgStr := `SELECT cid FROM header_cids @@ -178,7 +174,9 @@ func (ecr *EthCIDRetriever) retrieveUncleCIDs(tx *sqlx.Tx, streamFilters config. return headers, err } -func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]string, []int64, error) { +// RetrieveTrxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters +// also returns the ids for the returned transaction cids +func (ecr *EthCIDRetriever) RetrieveTrxCIDs(tx *sqlx.Tx, txFilter config.TrxFilter, blockNumber int64) ([]string, []int64, error) { log.Debug("retrieving transaction cids for block ", blockNumber) args := make([]interface{}, 0, 3) type result struct { @@ -189,13 +187,13 @@ func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters config.Su pgStr := `SELECT transaction_cids.id, transaction_cids.cid FROM transaction_cids INNER JOIN header_cids ON (transaction_cids.header_id = header_cids.id) WHERE header_cids.block_number = $1` args = append(args, blockNumber) - if len(streamFilters.TrxFilter.Dst) > 0 { + if len(txFilter.Dst) > 0 { pgStr += ` AND transaction_cids.dst = ANY($2::VARCHAR(66)[])` - args = append(args, pq.Array(streamFilters.TrxFilter.Dst)) + args = append(args, pq.Array(txFilter.Dst)) } - if len(streamFilters.TrxFilter.Src) > 0 { + if len(txFilter.Src) > 0 { pgStr += ` AND transaction_cids.src = ANY($3::VARCHAR(66)[])` - args = append(args, pq.Array(streamFilters.TrxFilter.Src)) + args = append(args, pq.Array(txFilter.Src)) } err := tx.Select(&results, pgStr, args...) if err != nil { @@ -210,7 +208,9 @@ func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters config.Su return cids, ids, nil } -func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64, trxIds []int64) ([]string, error) { +// RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight that conform to the provided +// filter parameters and correspond to the provided tx ids +func (ecr *EthCIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter config.ReceiptFilter, blockNumber int64, trxIds []int64) ([]string, error) { log.Debug("retrieving receipt cids for block ", blockNumber) args := make([]interface{}, 0, 4) pgStr := `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids @@ -218,13 +218,13 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su AND transaction_cids.header_id = header_cids.id AND header_cids.block_number = $1` args = append(args, blockNumber) - if len(streamFilters.ReceiptFilter.Topic0s) > 0 { + if len(rctFilter.Topic0s) > 0 { pgStr += ` AND ((receipt_cids.topic0s && $2::VARCHAR(66)[]` - args = append(args, pq.Array(streamFilters.ReceiptFilter.Topic0s)) - if len(streamFilters.ReceiptFilter.Contracts) > 0 { + args = append(args, pq.Array(rctFilter.Topic0s)) + if len(rctFilter.Contracts) > 0 { pgStr += ` AND receipt_cids.contract = ANY($3::VARCHAR(66)[]))` - args = append(args, pq.Array(streamFilters.ReceiptFilter.Contracts)) - if len(trxIds) > 0 { + args = append(args, pq.Array(rctFilter.Contracts)) + if rctFilter.MatchTxs && len(trxIds) > 0 { pgStr += ` OR receipt_cids.tx_id = ANY($4::INTEGER[]))` args = append(args, pq.Array(trxIds)) } else { @@ -232,7 +232,7 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su } } else { pgStr += `)` - if len(trxIds) > 0 { + if rctFilter.MatchTxs && len(trxIds) > 0 { pgStr += ` OR receipt_cids.tx_id = ANY($3::INTEGER[]))` args = append(args, pq.Array(trxIds)) } else { @@ -240,16 +240,16 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su } } } else { - if len(streamFilters.ReceiptFilter.Contracts) > 0 { + if len(rctFilter.Contracts) > 0 { pgStr += ` AND (receipt_cids.contract = ANY($2::VARCHAR(66)[])` - args = append(args, pq.Array(streamFilters.ReceiptFilter.Contracts)) - if len(trxIds) > 0 { + args = append(args, pq.Array(rctFilter.Contracts)) + if rctFilter.MatchTxs && len(trxIds) > 0 { pgStr += ` OR receipt_cids.tx_id = ANY($3::INTEGER[]))` args = append(args, pq.Array(trxIds)) } else { pgStr += `)` } - } else if len(trxIds) > 0 { + } else if rctFilter.MatchTxs && len(trxIds) > 0 { pgStr += ` AND receipt_cids.tx_id = ANY($2::INTEGER[])` args = append(args, pq.Array(trxIds)) } @@ -259,22 +259,23 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su return receiptCids, err } -func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]ipfs.StateNodeCID, error) { +// RetrieveStateCIDs retrieves and returns all of the state node cids at the provided blockheight that conform to the provided filter parameters +func (ecr *EthCIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter config.StateFilter, blockNumber int64) ([]ipfs.StateNodeCID, error) { log.Debug("retrieving state cids for block ", blockNumber) args := make([]interface{}, 0, 2) pgStr := `SELECT state_cids.cid, state_cids.state_key, state_cids.leaf FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id) WHERE header_cids.block_number = $1` args = append(args, blockNumber) - addrLen := len(streamFilters.StateFilter.Addresses) + addrLen := len(stateFilter.Addresses) if addrLen > 0 { keys := make([]string, 0, addrLen) - for _, addr := range streamFilters.StateFilter.Addresses { + for _, addr := range stateFilter.Addresses { keys = append(keys, ipfs.HexToKey(addr).Hex()) } pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])` args = append(args, pq.Array(keys)) } - if !streamFilters.StorageFilter.IntermediateNodes { + if !stateFilter.IntermediateNodes { pgStr += ` AND state_cids.leaf = TRUE` } stateNodeCIDs := make([]ipfs.StateNodeCID, 0) @@ -282,7 +283,8 @@ func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config. return stateNodeCIDs, err } -func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]ipfs.StorageNodeCID, error) { +// RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided blockheight that conform to the provided filter parameters +func (ecr *EthCIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter config.StorageFilter, blockNumber int64) ([]ipfs.StorageNodeCID, error) { log.Debug("retrieving storage cids for block ", blockNumber) args := make([]interface{}, 0, 3) pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key, storage_cids.leaf FROM storage_cids, state_cids, header_cids @@ -290,20 +292,23 @@ func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters confi AND state_cids.header_id = header_cids.id AND header_cids.block_number = $1` args = append(args, blockNumber) - addrLen := len(streamFilters.StorageFilter.Addresses) + addrLen := len(storageFilter.Addresses) if addrLen > 0 { keys := make([]string, 0, addrLen) - for _, addr := range streamFilters.StorageFilter.Addresses { + for _, addr := range storageFilter.Addresses { keys = append(keys, ipfs.HexToKey(addr).Hex()) } pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])` args = append(args, pq.Array(keys)) + if len(storageFilter.StorageKeys) > 0 { + pgStr += ` AND storage_cids.storage_key = ANY($3::VARCHAR(66)[])` + args = append(args, pq.Array(storageFilter.StorageKeys)) + } + } else if len(storageFilter.StorageKeys) > 0 { + pgStr += ` AND storage_cids.storage_key = ANY($2::VARCHAR(66)[])` + args = append(args, pq.Array(storageFilter.StorageKeys)) } - if len(streamFilters.StorageFilter.StorageKeys) > 0 { - pgStr += ` AND storage_cids.storage_key = ANY($3::VARCHAR(66)[])` - args = append(args, pq.Array(streamFilters.StorageFilter.StorageKeys)) - } - if !streamFilters.StorageFilter.IntermediateNodes { + if !storageFilter.IntermediateNodes { pgStr += ` AND storage_cids.leaf = TRUE` } storageNodeCIDs := make([]ipfs.StorageNodeCID, 0) @@ -334,3 +339,7 @@ func (ecr *EthCIDRetriever) RetrieveGapsInData() ([][2]uint64, error) { } return gapRanges, nil } + +func (ecr *EthCIDRetriever) Database() *postgres.DB { + return ecr.db +} diff --git a/pkg/super_node/retriever_test.go b/pkg/super_node/retriever_test.go index 3baef35d..1d04f3ab 100644 --- a/pkg/super_node/retriever_test.go +++ b/pkg/super_node/retriever_test.go @@ -126,6 +126,7 @@ var ( }, TrxFilter: config.TrxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter ReceiptFilter: config.ReceiptFilter{ + MatchTxs: true, Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000006"}, // Topic isn't one of the topics we have Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have }, @@ -146,6 +147,7 @@ var ( Dst: []string{"0x0000000000000000000000000000000000000001"}, // We only filter for one of the trxs so we will only get the one corresponding receipt }, ReceiptFilter: config.ReceiptFilter{ + MatchTxs: true, Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000006"}, // Topic isn't one of the topics we have Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have }, @@ -195,8 +197,8 @@ var _ = Describe("Retriever", func() { Describe("RetrieveCIDs", func() { BeforeEach(func() { - indexErr := repo.Index(mocks.MockCIDPayload) - Expect(indexErr).ToNot(HaveOccurred()) + err := repo.Index(mocks.MockCIDPayload) + Expect(err).ToNot(HaveOccurred()) }) It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() { cidWrapper, err := retriever.RetrieveCIDs(openFilter, 1) @@ -308,20 +310,20 @@ var _ = Describe("Retriever", func() { Describe("RetrieveFirstBlockNumber", func() { It("Gets the number of the first block that has data in the database", func() { - indexErr := repo.Index(mocks.MockCIDPayload) - Expect(indexErr).ToNot(HaveOccurred()) - num, retrieveErr := retriever.RetrieveFirstBlockNumber() - Expect(retrieveErr).ToNot(HaveOccurred()) + err := repo.Index(mocks.MockCIDPayload) + Expect(err).ToNot(HaveOccurred()) + num, err := retriever.RetrieveFirstBlockNumber() + Expect(err).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(1))) }) It("Gets the number of the first block that has data in the database", func() { payload := *mocks.MockCIDPayload payload.BlockNumber = "1010101" - indexErr := repo.Index(&payload) - Expect(indexErr).ToNot(HaveOccurred()) - num, retrieveErr := retriever.RetrieveFirstBlockNumber() - Expect(retrieveErr).ToNot(HaveOccurred()) + err := repo.Index(&payload) + Expect(err).ToNot(HaveOccurred()) + num, err := retriever.RetrieveFirstBlockNumber() + Expect(err).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(1010101))) }) @@ -330,32 +332,32 @@ var _ = Describe("Retriever", func() { payload1.BlockNumber = "1010101" payload2 := payload1 payload2.BlockNumber = "5" - indexErr := repo.Index(&payload1) - Expect(indexErr).ToNot(HaveOccurred()) - indexErr2 := repo.Index(&payload2) - Expect(indexErr2).ToNot(HaveOccurred()) - num, retrieveErr := retriever.RetrieveFirstBlockNumber() - Expect(retrieveErr).ToNot(HaveOccurred()) + err := repo.Index(&payload1) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload2) + Expect(err).ToNot(HaveOccurred()) + num, err := retriever.RetrieveFirstBlockNumber() + Expect(err).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(5))) }) }) Describe("RetrieveLastBlockNumber", func() { It("Gets the number of the latest block that has data in the database", func() { - indexErr := repo.Index(mocks.MockCIDPayload) - Expect(indexErr).ToNot(HaveOccurred()) - num, retrieveErr := retriever.RetrieveLastBlockNumber() - Expect(retrieveErr).ToNot(HaveOccurred()) + err := repo.Index(mocks.MockCIDPayload) + Expect(err).ToNot(HaveOccurred()) + num, err := retriever.RetrieveLastBlockNumber() + Expect(err).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(1))) }) It("Gets the number of the latest block that has data in the database", func() { payload := *mocks.MockCIDPayload payload.BlockNumber = "1010101" - indexErr := repo.Index(&payload) - Expect(indexErr).ToNot(HaveOccurred()) - num, retrieveErr := retriever.RetrieveLastBlockNumber() - Expect(retrieveErr).ToNot(HaveOccurred()) + err := repo.Index(&payload) + Expect(err).ToNot(HaveOccurred()) + num, err := retriever.RetrieveLastBlockNumber() + Expect(err).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(1010101))) }) @@ -364,12 +366,12 @@ var _ = Describe("Retriever", func() { payload1.BlockNumber = "1010101" payload2 := payload1 payload2.BlockNumber = "5" - indexErr := repo.Index(&payload1) - Expect(indexErr).ToNot(HaveOccurred()) - indexErr2 := repo.Index(&payload2) - Expect(indexErr2).ToNot(HaveOccurred()) - num, retrieveErr := retriever.RetrieveLastBlockNumber() - Expect(retrieveErr).ToNot(HaveOccurred()) + err := repo.Index(&payload1) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload2) + Expect(err).ToNot(HaveOccurred()) + num, err := retriever.RetrieveLastBlockNumber() + Expect(err).ToNot(HaveOccurred()) Expect(num).To(Equal(int64(1010101))) }) }) @@ -380,24 +382,24 @@ var _ = Describe("Retriever", func() { payload1.BlockNumber = "2" payload2 := payload1 payload2.BlockNumber = "3" - indexErr1 := repo.Index(mocks.MockCIDPayload) - Expect(indexErr1).ToNot(HaveOccurred()) - indexErr2 := repo.Index(&payload1) - Expect(indexErr2).ToNot(HaveOccurred()) - indexErr3 := repo.Index(&payload2) - Expect(indexErr3).ToNot(HaveOccurred()) - gaps, retrieveErr := retriever.RetrieveGapsInData() - Expect(retrieveErr).ToNot(HaveOccurred()) + err := repo.Index(mocks.MockCIDPayload) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload1) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload2) + Expect(err).ToNot(HaveOccurred()) + gaps, err := retriever.RetrieveGapsInData() + Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(0)) }) It("Doesn't return the gap from 0 to the earliest block", func() { payload := *mocks.MockCIDPayload payload.BlockNumber = "5" - indexErr := repo.Index(&payload) - Expect(indexErr).ToNot(HaveOccurred()) - gaps, retrieveErr := retriever.RetrieveGapsInData() - Expect(retrieveErr).ToNot(HaveOccurred()) + err := repo.Index(&payload) + Expect(err).ToNot(HaveOccurred()) + gaps, err := retriever.RetrieveGapsInData() + Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(0)) }) @@ -406,12 +408,12 @@ var _ = Describe("Retriever", func() { payload1.BlockNumber = "1010101" payload2 := payload1 payload2.BlockNumber = "5" - indexErr := repo.Index(&payload1) - Expect(indexErr).ToNot(HaveOccurred()) - indexErr2 := repo.Index(&payload2) - Expect(indexErr2).ToNot(HaveOccurred()) - gaps, retrieveErr := retriever.RetrieveGapsInData() - Expect(retrieveErr).ToNot(HaveOccurred()) + err := repo.Index(&payload1) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload2) + Expect(err).ToNot(HaveOccurred()) + gaps, err := retriever.RetrieveGapsInData() + Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(1)) Expect(gaps[0][0]).To(Equal(uint64(6))) Expect(gaps[0][1]).To(Equal(uint64(1010100))) @@ -430,20 +432,20 @@ var _ = Describe("Retriever", func() { payload5.BlockNumber = "102" payload6 := payload5 payload6.BlockNumber = "1000" - indexErr := repo.Index(&payload1) - Expect(indexErr).ToNot(HaveOccurred()) - indexErr2 := repo.Index(&payload2) - Expect(indexErr2).ToNot(HaveOccurred()) - indexErr3 := repo.Index(&payload3) - Expect(indexErr3).ToNot(HaveOccurred()) - indexErr4 := repo.Index(&payload4) - Expect(indexErr4).ToNot(HaveOccurred()) - indexErr5 := repo.Index(&payload5) - Expect(indexErr5).ToNot(HaveOccurred()) - indexErr6 := repo.Index(&payload6) - Expect(indexErr6).ToNot(HaveOccurred()) - gaps, retrieveErr := retriever.RetrieveGapsInData() - Expect(retrieveErr).ToNot(HaveOccurred()) + err := repo.Index(&payload1) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload2) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload3) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload4) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload5) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload6) + Expect(err).ToNot(HaveOccurred()) + gaps, err := retriever.RetrieveGapsInData() + Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(3)) Expect(super_node.ListContainsRange(gaps, [2]uint64{6, 99})).To(BeTrue()) Expect(super_node.ListContainsRange(gaps, [2]uint64{103, 999})).To(BeTrue())