expand retriever, fetcher, and resolver interfaces in prep for eth api mimick

This commit is contained in:
Ian Norden 2020-01-16 14:48:38 -06:00
parent 1b37e66d9d
commit 358575335b
10 changed files with 394 additions and 321 deletions

View File

@ -328,9 +328,12 @@ not send any transactions to the subscriber; `src` and `dst` are string arrays w
if they have any addresses then the super-node will only send transactions that were sent or received by the addresses contained if they have any addresses then the super-node will only send transactions that were sent or received by the addresses contained
in `src` and `dst`, respectively. in `src` and `dst`, respectively.
`subscription.receiptFilter` has two sub-options: `off` and `topics`. Setting `off` to true tells the super-node to `subscription.receiptFilter` has four sub-options: `off`, `topics`, `contracts` and `matchTxs`. Setting `off` to true tells the super-node to
not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for, not send any receipts to the subscriber; `topic0s` is a string array which can be filled with event topics we want to filter for,
if it has any topics then the super-node will only send receipts that contain logs which have that topic0. if it has any topics then the super-node will only send receipts that contain logs which have that topic0. Similarly, `contracts` is
a string array which can be filled with contract addresses we want to filter for, if it contains any contract addresses the super-node will
only send receipts that correspond to one of those contracts. `matchTrxs` is a bool which when set to true any receipts that correspond to filtered for
transactions will be sent by the super-node, regardless of whether or not the receipt satisfies the `topics` or `contracts` filters.
`subscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the super-node to `subscription.stateFilter` has three sub-options: `off`, `addresses`, and `intermediateNodes`. Setting `off` to true tells the super-node to
not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for, not send any state data to the subscriber; `addresses` is a string array which can be filled with ETH addresses we want to filter state for,

View File

@ -44,6 +44,7 @@ type TrxFilter struct {
type ReceiptFilter struct { type ReceiptFilter struct {
Off bool Off bool
MatchTxs bool // turn on to retrieve receipts that pair with retrieved transactions
Contracts []string Contracts []string
Topic0s []string Topic0s []string
} }

View File

@ -18,6 +18,7 @@ package ipfs
import ( import (
"context" "context"
"errors"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-block-format" "github.com/ipfs/go-block-format"
@ -26,9 +27,19 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
var (
errUnexpectedNumberOfIPLDs = errors.New("ipfs batch fetch returned unexpected number of IPLDs")
)
// IPLDFetcher is an interface for fetching IPLDs // IPLDFetcher is an interface for fetching IPLDs
type IPLDFetcher interface { type IPLDFetcher interface {
FetchIPLDs(cids CIDWrapper) (*IPLDWrapper, error) FetchIPLDs(cids CIDWrapper) (*IPLDWrapper, error)
FetchHeaders(cids []string) ([]blocks.Block, error)
FetchUncles(cids []string) ([]blocks.Block, error)
FetchTrxs(cids []string) ([]blocks.Block, error)
FetchRcts(cids []string) ([]blocks.Block, error)
FetchState(cids []StateNodeCID) (map[common.Hash]blocks.Block, error)
FetchStorage(cids []StorageNodeCID) (map[common.Hash]map[common.Hash]blocks.Block, error)
} }
// EthIPLDFetcher is used to fetch ETH IPLD objects from IPFS // EthIPLDFetcher is used to fetch ETH IPLD objects from IPFS
@ -51,165 +62,163 @@ func NewIPLDFetcher(ipfsPath string) (*EthIPLDFetcher, error) {
func (f *EthIPLDFetcher) FetchIPLDs(cids CIDWrapper) (*IPLDWrapper, error) { func (f *EthIPLDFetcher) FetchIPLDs(cids CIDWrapper) (*IPLDWrapper, error) {
log.Debug("fetching iplds") log.Debug("fetching iplds")
blocks := &IPLDWrapper{ iplds := new(IPLDWrapper)
BlockNumber: cids.BlockNumber, iplds.BlockNumber = cids.BlockNumber
Headers: make([]blocks.Block, 0), var err error
Uncles: make([]blocks.Block, 0), iplds.Headers, err = f.FetchHeaders(cids.Headers)
Transactions: make([]blocks.Block, 0), if err != nil {
Receipts: make([]blocks.Block, 0), return nil, err
StateNodes: make(map[common.Hash]blocks.Block), }
StorageNodes: make(map[common.Hash]map[common.Hash]blocks.Block), iplds.Uncles, err = f.FetchUncles(cids.Uncles)
if err != nil {
return nil, err
}
iplds.Transactions, err = f.FetchTrxs(cids.Transactions)
if err != nil {
return nil, err
}
iplds.Receipts, err = f.FetchRcts(cids.Receipts)
if err != nil {
return nil, err
}
iplds.StateNodes, err = f.FetchState(cids.StateNodes)
if err != nil {
return nil, err
}
iplds.StorageNodes, err = f.FetchStorage(cids.StorageNodes)
if err != nil {
return nil, err
}
return iplds, nil
} }
headersErr := f.fetchHeaders(cids, blocks) // FetchHeaders fetches headers
if headersErr != nil {
return nil, headersErr
}
unclesErr := f.fetchUncles(cids, blocks)
if unclesErr != nil {
return nil, unclesErr
}
trxsErr := f.fetchTrxs(cids, blocks)
if trxsErr != nil {
return nil, trxsErr
}
rctsErr := f.fetchRcts(cids, blocks)
if rctsErr != nil {
return nil, rctsErr
}
storageErr := f.fetchStorage(cids, blocks)
if storageErr != nil {
return nil, storageErr
}
stateErr := f.fetchState(cids, blocks)
if stateErr != nil {
return nil, stateErr
}
return blocks, nil
}
// fetchHeaders fetches headers
// It uses the f.fetchBatch method // It uses the f.fetchBatch method
func (f *EthIPLDFetcher) fetchHeaders(cids CIDWrapper, blocks *IPLDWrapper) error { func (f *EthIPLDFetcher) FetchHeaders(cids []string) ([]blocks.Block, error) {
log.Debug("fetching header iplds") log.Debug("fetching header iplds")
headerCids := make([]cid.Cid, 0, len(cids.Headers)) headerCids := make([]cid.Cid, 0, len(cids))
for _, c := range cids.Headers { for _, c := range cids {
dc, err := cid.Decode(c) dc, err := cid.Decode(c)
if err != nil { if err != nil {
return err return nil, err
} }
headerCids = append(headerCids, dc) headerCids = append(headerCids, dc)
} }
blocks.Headers = f.fetchBatch(headerCids) headers := f.fetchBatch(headerCids)
if len(blocks.Headers) != len(headerCids) { if len(headers) != len(headerCids) {
log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(blocks.Headers), len(headerCids)) log.Errorf("ipfs fetcher: number of header blocks returned (%d) does not match number expected (%d)", len(headers), len(headerCids))
return headers, errUnexpectedNumberOfIPLDs
} }
return nil return headers, nil
} }
// fetchUncles fetches uncles // FetchUncles fetches uncles
// It uses the f.fetchBatch method // It uses the f.fetchBatch method
func (f *EthIPLDFetcher) fetchUncles(cids CIDWrapper, blocks *IPLDWrapper) error { func (f *EthIPLDFetcher) FetchUncles(cids []string) ([]blocks.Block, error) {
log.Debug("fetching uncle iplds") log.Debug("fetching uncle iplds")
uncleCids := make([]cid.Cid, 0, len(cids.Uncles)) uncleCids := make([]cid.Cid, 0, len(cids))
for _, c := range cids.Uncles { for _, c := range cids {
dc, err := cid.Decode(c) dc, err := cid.Decode(c)
if err != nil { if err != nil {
return err return nil, err
} }
uncleCids = append(uncleCids, dc) uncleCids = append(uncleCids, dc)
} }
blocks.Uncles = f.fetchBatch(uncleCids) uncles := f.fetchBatch(uncleCids)
if len(blocks.Uncles) != len(uncleCids) { if len(uncles) != len(uncleCids) {
log.Errorf("ipfs fetcher: number of uncle blocks returned (%d) does not match number expected (%d)", len(blocks.Uncles), len(uncleCids)) log.Errorf("ipfs fetcher: number of uncle blocks returned (%d) does not match number expected (%d)", len(uncles), len(uncleCids))
return uncles, errUnexpectedNumberOfIPLDs
} }
return nil return uncles, nil
} }
// fetchTrxs fetches transactions // FetchTrxs fetches transactions
// It uses the f.fetchBatch method // It uses the f.fetchBatch method
func (f *EthIPLDFetcher) fetchTrxs(cids CIDWrapper, blocks *IPLDWrapper) error { func (f *EthIPLDFetcher) FetchTrxs(cids []string) ([]blocks.Block, error) {
log.Debug("fetching transaction iplds") log.Debug("fetching transaction iplds")
trxCids := make([]cid.Cid, 0, len(cids.Transactions)) trxCids := make([]cid.Cid, 0, len(cids))
for _, c := range cids.Transactions { for _, c := range cids {
dc, err := cid.Decode(c) dc, err := cid.Decode(c)
if err != nil { if err != nil {
return err return nil, err
} }
trxCids = append(trxCids, dc) trxCids = append(trxCids, dc)
} }
blocks.Transactions = f.fetchBatch(trxCids) trxs := f.fetchBatch(trxCids)
if len(blocks.Transactions) != len(trxCids) { if len(trxs) != len(trxCids) {
log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(blocks.Transactions), len(trxCids)) log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids))
return trxs, errUnexpectedNumberOfIPLDs
} }
return nil return trxs, nil
} }
// fetchRcts fetches receipts // FetchRcts fetches receipts
// It uses the f.fetchBatch method // It uses the f.fetchBatch method
func (f *EthIPLDFetcher) fetchRcts(cids CIDWrapper, blocks *IPLDWrapper) error { func (f *EthIPLDFetcher) FetchRcts(cids []string) ([]blocks.Block, error) {
log.Debug("fetching receipt iplds") log.Debug("fetching receipt iplds")
rctCids := make([]cid.Cid, 0, len(cids.Receipts)) rctCids := make([]cid.Cid, 0, len(cids))
for _, c := range cids.Receipts { for _, c := range cids {
dc, err := cid.Decode(c) dc, err := cid.Decode(c)
if err != nil { if err != nil {
return err return nil, err
} }
rctCids = append(rctCids, dc) rctCids = append(rctCids, dc)
} }
blocks.Receipts = f.fetchBatch(rctCids) rcts := f.fetchBatch(rctCids)
if len(blocks.Receipts) != len(rctCids) { if len(rcts) != len(rctCids) {
log.Errorf("ipfs fetcher: number of receipt blocks returned (%d) does not match number expected (%d)", len(blocks.Receipts), len(rctCids)) log.Errorf("ipfs fetcher: number of receipt blocks returned (%d) does not match number expected (%d)", len(rcts), len(rctCids))
return rcts, errUnexpectedNumberOfIPLDs
} }
return nil return rcts, nil
} }
// fetchState fetches state nodes // FetchState fetches state nodes
// It uses the single f.fetch method instead of the batch fetch, because it // It uses the single f.fetch method instead of the batch fetch, because it
// needs to maintain the data's relation to state keys // needs to maintain the data's relation to state keys
func (f *EthIPLDFetcher) fetchState(cids CIDWrapper, blocks *IPLDWrapper) error { func (f *EthIPLDFetcher) FetchState(cids []StateNodeCID) (map[common.Hash]blocks.Block, error) {
log.Debug("fetching state iplds") log.Debug("fetching state iplds")
for _, stateNode := range cids.StateNodes { stateNodes := make(map[common.Hash]blocks.Block)
for _, stateNode := range cids {
if stateNode.CID == "" || stateNode.Key == "" { if stateNode.CID == "" || stateNode.Key == "" {
continue continue
} }
dc, decodeErr := cid.Decode(stateNode.CID) dc, err := cid.Decode(stateNode.CID)
if decodeErr != nil { if err != nil {
return decodeErr return nil, err
} }
block, fetchErr := f.fetch(dc) state, err := f.fetch(dc)
if fetchErr != nil { if err != nil {
return fetchErr return nil, err
} }
blocks.StateNodes[common.HexToHash(stateNode.Key)] = block stateNodes[common.HexToHash(stateNode.Key)] = state
} }
return nil return stateNodes, nil
} }
// fetchStorage fetches storage nodes // FetchStorage fetches storage nodes
// It uses the single f.fetch method instead of the batch fetch, because it // It uses the single f.fetch method instead of the batch fetch, because it
// needs to maintain the data's relation to state and storage keys // needs to maintain the data's relation to state and storage keys
func (f *EthIPLDFetcher) fetchStorage(cids CIDWrapper, blks *IPLDWrapper) error { func (f *EthIPLDFetcher) FetchStorage(cids []StorageNodeCID) (map[common.Hash]map[common.Hash]blocks.Block, error) {
log.Debug("fetching storage iplds") log.Debug("fetching storage iplds")
for _, storageNode := range cids.StorageNodes { storageNodes := make(map[common.Hash]map[common.Hash]blocks.Block)
for _, storageNode := range cids {
if storageNode.CID == "" || storageNode.Key == "" || storageNode.StateKey == "" { if storageNode.CID == "" || storageNode.Key == "" || storageNode.StateKey == "" {
continue continue
} }
dc, decodeErr := cid.Decode(storageNode.CID) dc, err := cid.Decode(storageNode.CID)
if decodeErr != nil { if err != nil {
return decodeErr return nil, err
} }
blk, fetchErr := f.fetch(dc) storage, err := f.fetch(dc)
if fetchErr != nil { if err != nil {
return fetchErr return nil, err
} }
if blks.StorageNodes[common.HexToHash(storageNode.StateKey)] == nil { if storageNodes[common.HexToHash(storageNode.StateKey)] == nil {
blks.StorageNodes[common.HexToHash(storageNode.StateKey)] = make(map[common.Hash]blocks.Block) storageNodes[common.HexToHash(storageNode.StateKey)] = make(map[common.Hash]blocks.Block)
} }
blks.StorageNodes[common.HexToHash(storageNode.StateKey)][common.HexToHash(storageNode.Key)] = blk storageNodes[common.HexToHash(storageNode.StateKey)][common.HexToHash(storageNode.Key)] = storage
} }
return nil return storageNodes, nil
} }
// fetch is used to fetch a single cid // fetch is used to fetch a single cid

View File

@ -312,6 +312,7 @@ var (
MockSeeNodePayload = streamer.SuperNodePayload{ MockSeeNodePayload = streamer.SuperNodePayload{
BlockNumber: big.NewInt(1), BlockNumber: big.NewInt(1),
HeadersRlp: [][]byte{MockHeaderRlp}, HeadersRlp: [][]byte{MockHeaderRlp},
UnclesRlp: [][]byte{},
TransactionsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)}, TransactionsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)},
ReceiptsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)}, ReceiptsRlp: [][]byte{MockTransactions.GetRlp(0), MockTransactions.GetRlp(1)},
StateNodesRlp: map[common.Hash][]byte{ StateNodesRlp: map[common.Hash][]byte{

View File

@ -25,6 +25,12 @@ import (
// IPLDResolver is the interface to resolving IPLDs // IPLDResolver is the interface to resolving IPLDs
type IPLDResolver interface { type IPLDResolver interface {
ResolveIPLDs(ipfsBlocks IPLDWrapper) streamer.SuperNodePayload ResolveIPLDs(ipfsBlocks IPLDWrapper) streamer.SuperNodePayload
ResolveHeaders(iplds []blocks.Block) [][]byte
ResolveUncles(iplds []blocks.Block) [][]byte
ResolveTransactions(iplds []blocks.Block) [][]byte
ResolveReceipts(blocks []blocks.Block) [][]byte
ResolveState(iplds map[common.Hash]blocks.Block) map[common.Hash][]byte
ResolveStorage(iplds map[common.Hash]map[common.Hash]blocks.Block) map[common.Hash]map[common.Hash][]byte
} }
// EthIPLDResolver is the underlying struct to support the IPLDResolver interface // EthIPLDResolver is the underlying struct to support the IPLDResolver interface
@ -37,61 +43,64 @@ func NewIPLDResolver() *EthIPLDResolver {
// ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper // ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper
func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) streamer.SuperNodePayload { func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IPLDWrapper) streamer.SuperNodePayload {
response := &streamer.SuperNodePayload{ return streamer.SuperNodePayload{
BlockNumber: ipfsBlocks.BlockNumber, BlockNumber: ipfsBlocks.BlockNumber,
StateNodesRlp: make(map[common.Hash][]byte), HeadersRlp: eir.ResolveHeaders(ipfsBlocks.Headers),
StorageNodesRlp: make(map[common.Hash]map[common.Hash][]byte), UnclesRlp: eir.ResolveUncles(ipfsBlocks.Uncles),
} TransactionsRlp: eir.ResolveTransactions(ipfsBlocks.Transactions),
eir.resolveHeaders(ipfsBlocks.Headers, response) ReceiptsRlp: eir.ResolveReceipts(ipfsBlocks.Receipts),
eir.resolveUncles(ipfsBlocks.Uncles, response) StateNodesRlp: eir.ResolveState(ipfsBlocks.StateNodes),
eir.resolveTransactions(ipfsBlocks.Transactions, response) StorageNodesRlp: eir.ResolveStorage(ipfsBlocks.StorageNodes),
eir.resolveReceipts(ipfsBlocks.Receipts, response)
eir.resolveState(ipfsBlocks.StateNodes, response)
eir.resolveStorage(ipfsBlocks.StorageNodes, response)
return *response
}
func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *streamer.SuperNodePayload) {
for _, block := range blocks {
raw := block.RawData()
response.HeadersRlp = append(response.HeadersRlp, raw)
} }
} }
func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *streamer.SuperNodePayload) { func (eir *EthIPLDResolver) ResolveHeaders(iplds []blocks.Block) [][]byte {
for _, block := range blocks { headerRlps := make([][]byte, 0, len(iplds))
raw := block.RawData() for _, ipld := range iplds {
response.UnclesRlp = append(response.UnclesRlp, raw) headerRlps = append(headerRlps, ipld.RawData())
} }
return headerRlps
} }
func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *streamer.SuperNodePayload) { func (eir *EthIPLDResolver) ResolveUncles(iplds []blocks.Block) [][]byte {
for _, block := range blocks { uncleRlps := make([][]byte, 0, len(iplds))
raw := block.RawData() for _, ipld := range iplds {
response.TransactionsRlp = append(response.TransactionsRlp, raw) uncleRlps = append(uncleRlps, ipld.RawData())
} }
return uncleRlps
} }
func (eir *EthIPLDResolver) resolveReceipts(blocks []blocks.Block, response *streamer.SuperNodePayload) { func (eir *EthIPLDResolver) ResolveTransactions(iplds []blocks.Block) [][]byte {
for _, block := range blocks { trxs := make([][]byte, 0, len(iplds))
raw := block.RawData() for _, ipld := range iplds {
response.ReceiptsRlp = append(response.ReceiptsRlp, raw) trxs = append(trxs, ipld.RawData())
} }
return trxs
} }
func (eir *EthIPLDResolver) resolveState(blocks map[common.Hash]blocks.Block, response *streamer.SuperNodePayload) { func (eir *EthIPLDResolver) ResolveReceipts(iplds []blocks.Block) [][]byte {
for key, block := range blocks { rcts := make([][]byte, 0, len(iplds))
raw := block.RawData() for _, ipld := range iplds {
response.StateNodesRlp[key] = raw rcts = append(rcts, ipld.RawData())
} }
return rcts
} }
func (eir *EthIPLDResolver) resolveStorage(blocks map[common.Hash]map[common.Hash]blocks.Block, response *streamer.SuperNodePayload) { func (eir *EthIPLDResolver) ResolveState(iplds map[common.Hash]blocks.Block) map[common.Hash][]byte {
for stateKey, storageBlocks := range blocks { stateNodes := make(map[common.Hash][]byte, len(iplds))
response.StorageNodesRlp[stateKey] = make(map[common.Hash][]byte) for key, ipld := range iplds {
for storageKey, storageVal := range storageBlocks { stateNodes[key] = ipld.RawData()
raw := storageVal.RawData() }
response.StorageNodesRlp[stateKey][storageKey] = raw return stateNodes
}
func (eir *EthIPLDResolver) ResolveStorage(iplds map[common.Hash]map[common.Hash]blocks.Block) map[common.Hash]map[common.Hash][]byte {
storageNodes := make(map[common.Hash]map[common.Hash][]byte)
for stateKey, storageIPLDs := range iplds {
storageNodes[stateKey] = make(map[common.Hash][]byte)
for storageKey, storageVal := range storageIPLDs {
storageNodes[stateKey][storageKey] = storageVal.RawData()
} }
} }
return storageNodes
} }

View File

@ -43,35 +43,35 @@ func NewResponseFilterer() *Filterer {
// FilterResponse is used to filter through eth data to extract and package requested data into a Payload // FilterResponse is used to filter through eth data to extract and package requested data into a Payload
func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SuperNodePayload, error) { func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipfs.IPLDPayload) (streamer.SuperNodePayload, error) {
if checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response := new(streamer.SuperNodePayload) response := new(streamer.SuperNodePayload)
headersErr := s.filterHeaders(streamFilters, response, payload) if err := s.filterHeaders(streamFilters.HeaderFilter, response, payload); err != nil {
if headersErr != nil { return streamer.SuperNodePayload{}, err
return streamer.SuperNodePayload{}, headersErr
} }
txHashes, trxsErr := s.filterTransactions(streamFilters, response, payload) txHashes, err := s.filterTransactions(streamFilters.TrxFilter, response, payload)
if trxsErr != nil { if err != nil {
return streamer.SuperNodePayload{}, trxsErr return streamer.SuperNodePayload{}, err
} }
rctsErr := s.filerReceipts(streamFilters, response, payload, txHashes) if err := s.filerReceipts(streamFilters.ReceiptFilter, response, payload, txHashes); err != nil {
if rctsErr != nil { return streamer.SuperNodePayload{}, err
return streamer.SuperNodePayload{}, rctsErr
} }
stateErr := s.filterState(streamFilters, response, payload) if err := s.filterState(streamFilters.StateFilter, response, payload); err != nil {
if stateErr != nil { return streamer.SuperNodePayload{}, err
return streamer.SuperNodePayload{}, stateErr
} }
storageErr := s.filterStorage(streamFilters, response, payload) if err := s.filterStorage(streamFilters.StorageFilter, response, payload); err != nil {
if storageErr != nil { return streamer.SuperNodePayload{}, err
return streamer.SuperNodePayload{}, storageErr
} }
response.BlockNumber = payload.BlockNumber response.BlockNumber = payload.BlockNumber
return *response, nil return *response, nil
} }
return streamer.SuperNodePayload{}, nil
}
func (s *Filterer) filterHeaders(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { func (s *Filterer) filterHeaders(headerFilter config.HeaderFilter, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { if !headerFilter.Off {
response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP) response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP)
if streamFilters.HeaderFilter.Uncles { if headerFilter.Uncles {
response.UnclesRlp = make([][]byte, 0, len(payload.BlockBody.Uncles))
for _, uncle := range payload.BlockBody.Uncles { for _, uncle := range payload.BlockBody.Uncles {
uncleRlp, err := rlp.EncodeToBytes(uncle) uncleRlp, err := rlp.EncodeToBytes(uncle)
if err != nil { if err != nil {
@ -91,11 +91,11 @@ func checkRange(start, end, actual int64) bool {
return false return false
} }
func (s *Filterer) filterTransactions(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) ([]common.Hash, error) { func (s *Filterer) filterTransactions(trxFilter config.TrxFilter, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) ([]common.Hash, error) {
trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions)) trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions))
if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { if !trxFilter.Off {
for i, trx := range payload.BlockBody.Transactions { for i, trx := range payload.BlockBody.Transactions {
if checkTransactions(streamFilters.TrxFilter.Src, streamFilters.TrxFilter.Dst, payload.TrxMetaData[i].Src, payload.TrxMetaData[i].Dst) { if checkTransactions(trxFilter.Src, trxFilter.Dst, payload.TrxMetaData[i].Src, payload.TrxMetaData[i].Dst) {
trxBuffer := new(bytes.Buffer) trxBuffer := new(bytes.Buffer)
err := trx.EncodeRLP(trxBuffer) err := trx.EncodeRLP(trxBuffer)
if err != nil { if err != nil {
@ -127,10 +127,10 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin
return false return false
} }
func (s *Filterer) filerReceipts(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload, trxHashes []common.Hash) error { func (s *Filterer) filerReceipts(receiptFilter config.ReceiptFilter, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload, trxHashes []common.Hash) error {
if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { if !receiptFilter.Off {
for i, receipt := range payload.Receipts { for i, receipt := range payload.Receipts {
if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, streamFilters.ReceiptFilter.Contracts, payload.ReceiptMetaData[i].ContractAddress, trxHashes) { if checkReceipts(receipt, receiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, receiptFilter.Contracts, payload.ReceiptMetaData[i].ContractAddress, trxHashes, receiptFilter.MatchTxs) {
receiptForStorage := (*types.ReceiptForStorage)(receipt) receiptForStorage := (*types.ReceiptForStorage)(receipt)
receiptBuffer := new(bytes.Buffer) receiptBuffer := new(bytes.Buffer)
err := receiptForStorage.EncodeRLP(receiptBuffer) err := receiptForStorage.EncodeRLP(receiptBuffer)
@ -144,17 +144,19 @@ func (s *Filterer) filerReceipts(streamFilters config.Subscription, response *st
return nil return nil
} }
func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContracts []string, actualContract string, wantedTrxHashes []common.Hash) bool { func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContracts []string, actualContract string, wantedTrxHashes []common.Hash, matchTxs bool) bool {
// If we aren't filtering for any topics or contracts, all topics are a go // If we aren't filtering for any topics, contracts, or corresponding trxs then all receipts are a go
if len(wantedTopics) == 0 && len(wantedContracts) == 0 { if len(wantedTopics) == 0 && len(wantedContracts) == 0 && (len(wantedTrxHashes) == 0 || !matchTxs) {
return true return true
} }
// No matter what filters we have, we keep receipts for the trxs we are interested in // No matter what filters we have, we keep receipts for specific trxs we are interested in
if matchTxs {
for _, wantedTrxHash := range wantedTrxHashes { for _, wantedTrxHash := range wantedTrxHashes {
if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) { if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) {
return true return true
} }
} }
}
if len(wantedContracts) == 0 { if len(wantedContracts) == 0 {
// We keep all receipts that have logs we are interested in // We keep all receipts that have logs we are interested in
@ -165,7 +167,7 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContrac
} }
} }
} }
} else { // We keep receipts that belong to one of the specified contracts and have logs with topics if we aren't filtering on topics } else { // We keep all receipts that belong to one of the specified contracts if we aren't filtering on topics
for _, wantedContract := range wantedContracts { for _, wantedContract := range wantedContracts {
if wantedContract == actualContract { if wantedContract == actualContract {
if len(wantedTopics) == 0 { if len(wantedTopics) == 0 {
@ -186,17 +188,17 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContrac
return false return false
} }
func (s *Filterer) filterState(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { func (s *Filterer) filterState(stateFilter config.StateFilter, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { if !stateFilter.Off {
response.StateNodesRlp = make(map[common.Hash][]byte) response.StateNodesRlp = make(map[common.Hash][]byte)
keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses)) keyFilters := make([]common.Hash, 0, len(stateFilter.Addresses))
for _, addr := range streamFilters.StateFilter.Addresses { for _, addr := range stateFilter.Addresses {
keyFilter := ipfs.AddressToKey(common.HexToAddress(addr)) keyFilter := ipfs.AddressToKey(common.HexToAddress(addr))
keyFilters = append(keyFilters, keyFilter) keyFilters = append(keyFilters, keyFilter)
} }
for key, stateNode := range payload.StateNodes { for key, stateNode := range payload.StateNodes {
if checkNodeKeys(keyFilters, key) { if checkNodeKeys(keyFilters, key) {
if stateNode.Leaf || streamFilters.StateFilter.IntermediateNodes { if stateNode.Leaf || stateFilter.IntermediateNodes {
response.StateNodesRlp[key] = stateNode.Value response.StateNodesRlp[key] = stateNode.Value
} }
} }
@ -218,16 +220,16 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool {
return false return false
} }
func (s *Filterer) filterStorage(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { func (s *Filterer) filterStorage(storageFilter config.StorageFilter, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { if !storageFilter.Off {
response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte)
stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses)) stateKeyFilters := make([]common.Hash, 0, len(storageFilter.Addresses))
for _, addr := range streamFilters.StorageFilter.Addresses { for _, addr := range storageFilter.Addresses {
keyFilter := ipfs.AddressToKey(common.HexToAddress(addr)) keyFilter := ipfs.AddressToKey(common.HexToAddress(addr))
stateKeyFilters = append(stateKeyFilters, keyFilter) stateKeyFilters = append(stateKeyFilters, keyFilter)
} }
storageKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.StorageKeys)) storageKeyFilters := make([]common.Hash, 0, len(storageFilter.StorageKeys))
for _, store := range streamFilters.StorageFilter.StorageKeys { for _, store := range storageFilter.StorageKeys {
keyFilter := ipfs.HexToKey(store) keyFilter := ipfs.HexToKey(store)
storageKeyFilters = append(storageKeyFilters, keyFilter) storageKeyFilters = append(storageKeyFilters, keyFilter)
} }

View File

@ -46,7 +46,8 @@ var _ = Describe("Filterer", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp)) Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp))
Expect(superNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp)) var unclesRlp [][]byte
Expect(superNodePayload.UnclesRlp).To(Equal(unclesRlp))
Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2)) Expect(len(superNodePayload.TransactionsRlp)).To(Equal(2))
Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue()) Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(0))).To(BeTrue())
Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue()) Expect(super_node.ListContainsBytes(superNodePayload.TransactionsRlp, mocks.MockTransactions.GetRlp(1))).To(BeTrue())

View File

@ -1,7 +1,9 @@
package mocks package mocks
import ( import (
"github.com/jmoiron/sqlx"
"github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
) )
@ -19,6 +21,36 @@ func (*MockCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNu
panic("implement me") panic("implement me")
} }
// RetrieveHeaderCIDs mock method
func (*MockCIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]string, error) {
panic("implement me")
}
// RetrieveUncleCIDs mock method
func (*MockCIDRetriever) RetrieveUncleCIDs(tx *sqlx.Tx, blockNumber int64) ([]string, error) {
panic("implement me")
}
// RetrieveTrxCIDs mock method
func (*MockCIDRetriever) RetrieveTrxCIDs(tx *sqlx.Tx, txFilter config.TrxFilter, blockNumber int64) ([]string, []int64, error) {
panic("implement me")
}
// RetrieveRctCIDs mock method
func (*MockCIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter config.ReceiptFilter, blockNumber int64, trxIds []int64) ([]string, error) {
panic("implement me")
}
// RetrieveStateCIDs mock method
func (*MockCIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter config.StateFilter, blockNumber int64) ([]ipfs.StateNodeCID, error) {
panic("implement me")
}
// RetrieveLastBlockNumber mock method // RetrieveLastBlockNumber mock method
func (*MockCIDRetriever) RetrieveLastBlockNumber() (int64, error) { func (*MockCIDRetriever) RetrieveLastBlockNumber() (int64, error) {
panic("implement me") panic("implement me")
@ -42,3 +74,7 @@ func (mcr *MockCIDRetriever) SetGapsToRetrieve(gaps [][2]uint64) {
} }
mcr.GapsToRetrieve = append(mcr.GapsToRetrieve, gaps...) mcr.GapsToRetrieve = append(mcr.GapsToRetrieve, gaps...)
} }
func (mcr *MockCIDRetriever) Database() *postgres.DB {
panic("implement me")
}

View File

@ -34,6 +34,12 @@ type CIDRetriever interface {
RetrieveLastBlockNumber() (int64, error) RetrieveLastBlockNumber() (int64, error)
RetrieveFirstBlockNumber() (int64, error) RetrieveFirstBlockNumber() (int64, error)
RetrieveGapsInData() ([][2]uint64, error) RetrieveGapsInData() ([][2]uint64, error)
RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]string, error)
RetrieveUncleCIDs(tx *sqlx.Tx, blockNumber int64) ([]string, error)
RetrieveTrxCIDs(tx *sqlx.Tx, txFilter config.TrxFilter, blockNumber int64) ([]string, []int64, error)
RetrieveRctCIDs(tx *sqlx.Tx, rctFilter config.ReceiptFilter, blockNumber int64, trxIds []int64) ([]string, error)
RetrieveStateCIDs(tx *sqlx.Tx, stateFilter config.StateFilter, blockNumber int64) ([]ipfs.StateNodeCID, error)
Database() *postgres.DB
} }
// EthCIDRetriever is the underlying struct supporting the CIDRetriever interface // EthCIDRetriever is the underlying struct supporting the CIDRetriever interface
@ -65,37 +71,32 @@ func (ecr *EthCIDRetriever) RetrieveLastBlockNumber() (int64, error) {
// RetrieveCIDs is used to retrieve all of the CIDs which conform to the passed StreamFilters // RetrieveCIDs is used to retrieve all of the CIDs which conform to the passed StreamFilters
func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) { func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*ipfs.CIDWrapper, error) {
log.Debug("retrieving cids") log.Debug("retrieving cids")
tx, beginErr := ecr.db.Beginx() tx, err := ecr.db.Beginx()
if beginErr != nil { if err != nil {
return nil, beginErr return nil, err
} }
// THIS IS SUPER EXPENSIVE HAVING TO CYCLE THROUGH EACH BLOCK, NEED BETTER WAY TO FETCH CIDS
// WHILE STILL MAINTAINING RELATION INFO ABOUT WHAT BLOCK THE CIDS BELONG TO
cw := new(ipfs.CIDWrapper) cw := new(ipfs.CIDWrapper)
cw.BlockNumber = big.NewInt(blockNumber) cw.BlockNumber = big.NewInt(blockNumber)
// Retrieve cached header CIDs // Retrieve cached header CIDs
if !streamFilters.HeaderFilter.Off { if !streamFilters.HeaderFilter.Off {
var headersErr error cw.Headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber)
cw.Headers, headersErr = ecr.retrieveHeaderCIDs(tx, streamFilters, blockNumber) if err != nil {
if headersErr != nil { if err := tx.Rollback(); err != nil {
rollbackErr := tx.Rollback() log.Error(err)
if rollbackErr != nil {
log.Error(rollbackErr)
} }
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
return nil, headersErr return nil, err
} }
if streamFilters.HeaderFilter.Uncles { if streamFilters.HeaderFilter.Uncles {
var unclesErr error cw.Uncles, err = ecr.RetrieveUncleCIDs(tx, blockNumber)
cw.Uncles, unclesErr = ecr.retrieveUncleCIDs(tx, streamFilters, blockNumber) if err != nil {
if unclesErr != nil { if err := tx.Rollback(); err != nil {
rollbackErr := tx.Rollback() log.Error(err)
if rollbackErr != nil {
log.Error(rollbackErr)
} }
log.Error("uncle cid retrieval error") log.Error("uncle cid retrieval error")
return nil, unclesErr return nil, err
} }
} }
} }
@ -103,64 +104,58 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, bloc
// Retrieve cached trx CIDs // Retrieve cached trx CIDs
var trxIds []int64 var trxIds []int64
if !streamFilters.TrxFilter.Off { if !streamFilters.TrxFilter.Off {
var trxsErr error cw.Transactions, trxIds, err = ecr.RetrieveTrxCIDs(tx, streamFilters.TrxFilter, blockNumber)
cw.Transactions, trxIds, trxsErr = ecr.retrieveTrxCIDs(tx, streamFilters, blockNumber) if err != nil {
if trxsErr != nil { err := tx.Rollback()
rollbackErr := tx.Rollback() if err != nil {
if rollbackErr != nil { log.Error(err)
log.Error(rollbackErr)
} }
log.Error("transaction cid retrieval error") log.Error("transaction cid retrieval error")
return nil, trxsErr return nil, err
} }
} }
// Retrieve cached receipt CIDs // Retrieve cached receipt CIDs
if !streamFilters.ReceiptFilter.Off { if !streamFilters.ReceiptFilter.Off {
var rctsErr error cw.Receipts, err = ecr.RetrieveRctCIDs(tx, streamFilters.ReceiptFilter, blockNumber, trxIds)
cw.Receipts, rctsErr = ecr.retrieveRctCIDs(tx, streamFilters, blockNumber, trxIds) if err != nil {
if rctsErr != nil { if err := tx.Rollback(); err != nil {
rollbackErr := tx.Rollback() log.Error(err)
if rollbackErr != nil {
log.Error(rollbackErr)
} }
log.Error("receipt cid retrieval error") log.Error("receipt cid retrieval error")
return nil, rctsErr return nil, err
} }
} }
// Retrieve cached state CIDs // Retrieve cached state CIDs
if !streamFilters.StateFilter.Off { if !streamFilters.StateFilter.Off {
var stateErr error cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, streamFilters.StateFilter, blockNumber)
cw.StateNodes, stateErr = ecr.retrieveStateCIDs(tx, streamFilters, blockNumber) if err != nil {
if stateErr != nil { if err := tx.Rollback(); err != nil {
rollbackErr := tx.Rollback() log.Error(err)
if rollbackErr != nil {
log.Error(rollbackErr)
} }
log.Error("state cid retrieval error") log.Error("state cid retrieval error")
return nil, stateErr return nil, err
} }
} }
// Retrieve cached storage CIDs // Retrieve cached storage CIDs
if !streamFilters.StorageFilter.Off { if !streamFilters.StorageFilter.Off {
var storageErr error cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, streamFilters.StorageFilter, blockNumber)
cw.StorageNodes, storageErr = ecr.retrieveStorageCIDs(tx, streamFilters, blockNumber) if err != nil {
if storageErr != nil { if err := tx.Rollback(); err != nil {
rollbackErr := tx.Rollback() log.Error(err)
if rollbackErr != nil {
log.Error(rollbackErr)
} }
log.Error("storage cid retrieval error") log.Error("storage cid retrieval error")
return nil, storageErr return nil, err
} }
} }
return cw, tx.Commit() return cw, tx.Commit()
} }
func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]string, error) { // RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight
func (ecr *EthCIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]string, error) {
log.Debug("retrieving header cids for block ", blockNumber) log.Debug("retrieving header cids for block ", blockNumber)
headers := make([]string, 0) headers := make([]string, 0)
pgStr := `SELECT cid FROM header_cids pgStr := `SELECT cid FROM header_cids
@ -169,7 +164,8 @@ func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config
return headers, err return headers, err
} }
func (ecr *EthCIDRetriever) retrieveUncleCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]string, error) { // RetrieveUncleCIDs retrieves and returns all of the uncle cids at the provided blockheight
func (ecr *EthCIDRetriever) RetrieveUncleCIDs(tx *sqlx.Tx, blockNumber int64) ([]string, error) {
log.Debug("retrieving header cids for block ", blockNumber) log.Debug("retrieving header cids for block ", blockNumber)
headers := make([]string, 0) headers := make([]string, 0)
pgStr := `SELECT cid FROM header_cids pgStr := `SELECT cid FROM header_cids
@ -178,7 +174,9 @@ func (ecr *EthCIDRetriever) retrieveUncleCIDs(tx *sqlx.Tx, streamFilters config.
return headers, err return headers, err
} }
func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]string, []int64, error) { // RetrieveTrxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
// also returns the ids for the returned transaction cids
func (ecr *EthCIDRetriever) RetrieveTrxCIDs(tx *sqlx.Tx, txFilter config.TrxFilter, blockNumber int64) ([]string, []int64, error) {
log.Debug("retrieving transaction cids for block ", blockNumber) log.Debug("retrieving transaction cids for block ", blockNumber)
args := make([]interface{}, 0, 3) args := make([]interface{}, 0, 3)
type result struct { type result struct {
@ -189,13 +187,13 @@ func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters config.Su
pgStr := `SELECT transaction_cids.id, transaction_cids.cid FROM transaction_cids INNER JOIN header_cids ON (transaction_cids.header_id = header_cids.id) pgStr := `SELECT transaction_cids.id, transaction_cids.cid FROM transaction_cids INNER JOIN header_cids ON (transaction_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1` WHERE header_cids.block_number = $1`
args = append(args, blockNumber) args = append(args, blockNumber)
if len(streamFilters.TrxFilter.Dst) > 0 { if len(txFilter.Dst) > 0 {
pgStr += ` AND transaction_cids.dst = ANY($2::VARCHAR(66)[])` pgStr += ` AND transaction_cids.dst = ANY($2::VARCHAR(66)[])`
args = append(args, pq.Array(streamFilters.TrxFilter.Dst)) args = append(args, pq.Array(txFilter.Dst))
} }
if len(streamFilters.TrxFilter.Src) > 0 { if len(txFilter.Src) > 0 {
pgStr += ` AND transaction_cids.src = ANY($3::VARCHAR(66)[])` pgStr += ` AND transaction_cids.src = ANY($3::VARCHAR(66)[])`
args = append(args, pq.Array(streamFilters.TrxFilter.Src)) args = append(args, pq.Array(txFilter.Src))
} }
err := tx.Select(&results, pgStr, args...) err := tx.Select(&results, pgStr, args...)
if err != nil { if err != nil {
@ -210,7 +208,9 @@ func (ecr *EthCIDRetriever) retrieveTrxCIDs(tx *sqlx.Tx, streamFilters config.Su
return cids, ids, nil return cids, ids, nil
} }
func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64, trxIds []int64) ([]string, error) { // RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight that conform to the provided
// filter parameters and correspond to the provided tx ids
func (ecr *EthCIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter config.ReceiptFilter, blockNumber int64, trxIds []int64) ([]string, error) {
log.Debug("retrieving receipt cids for block ", blockNumber) log.Debug("retrieving receipt cids for block ", blockNumber)
args := make([]interface{}, 0, 4) args := make([]interface{}, 0, 4)
pgStr := `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids pgStr := `SELECT receipt_cids.cid FROM receipt_cids, transaction_cids, header_cids
@ -218,13 +218,13 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su
AND transaction_cids.header_id = header_cids.id AND transaction_cids.header_id = header_cids.id
AND header_cids.block_number = $1` AND header_cids.block_number = $1`
args = append(args, blockNumber) args = append(args, blockNumber)
if len(streamFilters.ReceiptFilter.Topic0s) > 0 { if len(rctFilter.Topic0s) > 0 {
pgStr += ` AND ((receipt_cids.topic0s && $2::VARCHAR(66)[]` pgStr += ` AND ((receipt_cids.topic0s && $2::VARCHAR(66)[]`
args = append(args, pq.Array(streamFilters.ReceiptFilter.Topic0s)) args = append(args, pq.Array(rctFilter.Topic0s))
if len(streamFilters.ReceiptFilter.Contracts) > 0 { if len(rctFilter.Contracts) > 0 {
pgStr += ` AND receipt_cids.contract = ANY($3::VARCHAR(66)[]))` pgStr += ` AND receipt_cids.contract = ANY($3::VARCHAR(66)[]))`
args = append(args, pq.Array(streamFilters.ReceiptFilter.Contracts)) args = append(args, pq.Array(rctFilter.Contracts))
if len(trxIds) > 0 { if rctFilter.MatchTxs && len(trxIds) > 0 {
pgStr += ` OR receipt_cids.tx_id = ANY($4::INTEGER[]))` pgStr += ` OR receipt_cids.tx_id = ANY($4::INTEGER[]))`
args = append(args, pq.Array(trxIds)) args = append(args, pq.Array(trxIds))
} else { } else {
@ -232,7 +232,7 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su
} }
} else { } else {
pgStr += `)` pgStr += `)`
if len(trxIds) > 0 { if rctFilter.MatchTxs && len(trxIds) > 0 {
pgStr += ` OR receipt_cids.tx_id = ANY($3::INTEGER[]))` pgStr += ` OR receipt_cids.tx_id = ANY($3::INTEGER[]))`
args = append(args, pq.Array(trxIds)) args = append(args, pq.Array(trxIds))
} else { } else {
@ -240,16 +240,16 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su
} }
} }
} else { } else {
if len(streamFilters.ReceiptFilter.Contracts) > 0 { if len(rctFilter.Contracts) > 0 {
pgStr += ` AND (receipt_cids.contract = ANY($2::VARCHAR(66)[])` pgStr += ` AND (receipt_cids.contract = ANY($2::VARCHAR(66)[])`
args = append(args, pq.Array(streamFilters.ReceiptFilter.Contracts)) args = append(args, pq.Array(rctFilter.Contracts))
if len(trxIds) > 0 { if rctFilter.MatchTxs && len(trxIds) > 0 {
pgStr += ` OR receipt_cids.tx_id = ANY($3::INTEGER[]))` pgStr += ` OR receipt_cids.tx_id = ANY($3::INTEGER[]))`
args = append(args, pq.Array(trxIds)) args = append(args, pq.Array(trxIds))
} else { } else {
pgStr += `)` pgStr += `)`
} }
} else if len(trxIds) > 0 { } else if rctFilter.MatchTxs && len(trxIds) > 0 {
pgStr += ` AND receipt_cids.tx_id = ANY($2::INTEGER[])` pgStr += ` AND receipt_cids.tx_id = ANY($2::INTEGER[])`
args = append(args, pq.Array(trxIds)) args = append(args, pq.Array(trxIds))
} }
@ -259,22 +259,23 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su
return receiptCids, err return receiptCids, err
} }
func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]ipfs.StateNodeCID, error) { // RetrieveStateCIDs retrieves and returns all of the state node cids at the provided blockheight that conform to the provided filter parameters
func (ecr *EthCIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter config.StateFilter, blockNumber int64) ([]ipfs.StateNodeCID, error) {
log.Debug("retrieving state cids for block ", blockNumber) log.Debug("retrieving state cids for block ", blockNumber)
args := make([]interface{}, 0, 2) args := make([]interface{}, 0, 2)
pgStr := `SELECT state_cids.cid, state_cids.state_key, state_cids.leaf FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id) pgStr := `SELECT state_cids.cid, state_cids.state_key, state_cids.leaf FROM state_cids INNER JOIN header_cids ON (state_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1` WHERE header_cids.block_number = $1`
args = append(args, blockNumber) args = append(args, blockNumber)
addrLen := len(streamFilters.StateFilter.Addresses) addrLen := len(stateFilter.Addresses)
if addrLen > 0 { if addrLen > 0 {
keys := make([]string, 0, addrLen) keys := make([]string, 0, addrLen)
for _, addr := range streamFilters.StateFilter.Addresses { for _, addr := range stateFilter.Addresses {
keys = append(keys, ipfs.HexToKey(addr).Hex()) keys = append(keys, ipfs.HexToKey(addr).Hex())
} }
pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])` pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])`
args = append(args, pq.Array(keys)) args = append(args, pq.Array(keys))
} }
if !streamFilters.StorageFilter.IntermediateNodes { if !stateFilter.IntermediateNodes {
pgStr += ` AND state_cids.leaf = TRUE` pgStr += ` AND state_cids.leaf = TRUE`
} }
stateNodeCIDs := make([]ipfs.StateNodeCID, 0) stateNodeCIDs := make([]ipfs.StateNodeCID, 0)
@ -282,7 +283,8 @@ func (ecr *EthCIDRetriever) retrieveStateCIDs(tx *sqlx.Tx, streamFilters config.
return stateNodeCIDs, err return stateNodeCIDs, err
} }
func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]ipfs.StorageNodeCID, error) { // RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided blockheight that conform to the provided filter parameters
func (ecr *EthCIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter config.StorageFilter, blockNumber int64) ([]ipfs.StorageNodeCID, error) {
log.Debug("retrieving storage cids for block ", blockNumber) log.Debug("retrieving storage cids for block ", blockNumber)
args := make([]interface{}, 0, 3) args := make([]interface{}, 0, 3)
pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key, storage_cids.leaf FROM storage_cids, state_cids, header_cids pgStr := `SELECT storage_cids.cid, state_cids.state_key, storage_cids.storage_key, storage_cids.leaf FROM storage_cids, state_cids, header_cids
@ -290,20 +292,23 @@ func (ecr *EthCIDRetriever) retrieveStorageCIDs(tx *sqlx.Tx, streamFilters confi
AND state_cids.header_id = header_cids.id AND state_cids.header_id = header_cids.id
AND header_cids.block_number = $1` AND header_cids.block_number = $1`
args = append(args, blockNumber) args = append(args, blockNumber)
addrLen := len(streamFilters.StorageFilter.Addresses) addrLen := len(storageFilter.Addresses)
if addrLen > 0 { if addrLen > 0 {
keys := make([]string, 0, addrLen) keys := make([]string, 0, addrLen)
for _, addr := range streamFilters.StorageFilter.Addresses { for _, addr := range storageFilter.Addresses {
keys = append(keys, ipfs.HexToKey(addr).Hex()) keys = append(keys, ipfs.HexToKey(addr).Hex())
} }
pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])` pgStr += ` AND state_cids.state_key = ANY($2::VARCHAR(66)[])`
args = append(args, pq.Array(keys)) args = append(args, pq.Array(keys))
} if len(storageFilter.StorageKeys) > 0 {
if len(streamFilters.StorageFilter.StorageKeys) > 0 {
pgStr += ` AND storage_cids.storage_key = ANY($3::VARCHAR(66)[])` pgStr += ` AND storage_cids.storage_key = ANY($3::VARCHAR(66)[])`
args = append(args, pq.Array(streamFilters.StorageFilter.StorageKeys)) args = append(args, pq.Array(storageFilter.StorageKeys))
} }
if !streamFilters.StorageFilter.IntermediateNodes { } else if len(storageFilter.StorageKeys) > 0 {
pgStr += ` AND storage_cids.storage_key = ANY($2::VARCHAR(66)[])`
args = append(args, pq.Array(storageFilter.StorageKeys))
}
if !storageFilter.IntermediateNodes {
pgStr += ` AND storage_cids.leaf = TRUE` pgStr += ` AND storage_cids.leaf = TRUE`
} }
storageNodeCIDs := make([]ipfs.StorageNodeCID, 0) storageNodeCIDs := make([]ipfs.StorageNodeCID, 0)
@ -334,3 +339,7 @@ func (ecr *EthCIDRetriever) RetrieveGapsInData() ([][2]uint64, error) {
} }
return gapRanges, nil return gapRanges, nil
} }
func (ecr *EthCIDRetriever) Database() *postgres.DB {
return ecr.db
}

View File

@ -126,6 +126,7 @@ var (
}, },
TrxFilter: config.TrxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter TrxFilter: config.TrxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter
ReceiptFilter: config.ReceiptFilter{ ReceiptFilter: config.ReceiptFilter{
MatchTxs: true,
Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000006"}, // Topic isn't one of the topics we have Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000006"}, // Topic isn't one of the topics we have
Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have
}, },
@ -146,6 +147,7 @@ var (
Dst: []string{"0x0000000000000000000000000000000000000001"}, // We only filter for one of the trxs so we will only get the one corresponding receipt Dst: []string{"0x0000000000000000000000000000000000000001"}, // We only filter for one of the trxs so we will only get the one corresponding receipt
}, },
ReceiptFilter: config.ReceiptFilter{ ReceiptFilter: config.ReceiptFilter{
MatchTxs: true,
Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000006"}, // Topic isn't one of the topics we have Topic0s: []string{"0x0000000000000000000000000000000000000000000000000000000000000006"}, // Topic isn't one of the topics we have
Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have
}, },
@ -195,8 +197,8 @@ var _ = Describe("Retriever", func() {
Describe("RetrieveCIDs", func() { Describe("RetrieveCIDs", func() {
BeforeEach(func() { BeforeEach(func() {
indexErr := repo.Index(mocks.MockCIDPayload) err := repo.Index(mocks.MockCIDPayload)
Expect(indexErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() { It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() {
cidWrapper, err := retriever.RetrieveCIDs(openFilter, 1) cidWrapper, err := retriever.RetrieveCIDs(openFilter, 1)
@ -308,20 +310,20 @@ var _ = Describe("Retriever", func() {
Describe("RetrieveFirstBlockNumber", func() { Describe("RetrieveFirstBlockNumber", func() {
It("Gets the number of the first block that has data in the database", func() { It("Gets the number of the first block that has data in the database", func() {
indexErr := repo.Index(mocks.MockCIDPayload) err := repo.Index(mocks.MockCIDPayload)
Expect(indexErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
num, retrieveErr := retriever.RetrieveFirstBlockNumber() num, err := retriever.RetrieveFirstBlockNumber()
Expect(retrieveErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(num).To(Equal(int64(1))) Expect(num).To(Equal(int64(1)))
}) })
It("Gets the number of the first block that has data in the database", func() { It("Gets the number of the first block that has data in the database", func() {
payload := *mocks.MockCIDPayload payload := *mocks.MockCIDPayload
payload.BlockNumber = "1010101" payload.BlockNumber = "1010101"
indexErr := repo.Index(&payload) err := repo.Index(&payload)
Expect(indexErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
num, retrieveErr := retriever.RetrieveFirstBlockNumber() num, err := retriever.RetrieveFirstBlockNumber()
Expect(retrieveErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(num).To(Equal(int64(1010101))) Expect(num).To(Equal(int64(1010101)))
}) })
@ -330,32 +332,32 @@ var _ = Describe("Retriever", func() {
payload1.BlockNumber = "1010101" payload1.BlockNumber = "1010101"
payload2 := payload1 payload2 := payload1
payload2.BlockNumber = "5" payload2.BlockNumber = "5"
indexErr := repo.Index(&payload1) err := repo.Index(&payload1)
Expect(indexErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
indexErr2 := repo.Index(&payload2) err = repo.Index(&payload2)
Expect(indexErr2).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
num, retrieveErr := retriever.RetrieveFirstBlockNumber() num, err := retriever.RetrieveFirstBlockNumber()
Expect(retrieveErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(num).To(Equal(int64(5))) Expect(num).To(Equal(int64(5)))
}) })
}) })
Describe("RetrieveLastBlockNumber", func() { Describe("RetrieveLastBlockNumber", func() {
It("Gets the number of the latest block that has data in the database", func() { It("Gets the number of the latest block that has data in the database", func() {
indexErr := repo.Index(mocks.MockCIDPayload) err := repo.Index(mocks.MockCIDPayload)
Expect(indexErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
num, retrieveErr := retriever.RetrieveLastBlockNumber() num, err := retriever.RetrieveLastBlockNumber()
Expect(retrieveErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(num).To(Equal(int64(1))) Expect(num).To(Equal(int64(1)))
}) })
It("Gets the number of the latest block that has data in the database", func() { It("Gets the number of the latest block that has data in the database", func() {
payload := *mocks.MockCIDPayload payload := *mocks.MockCIDPayload
payload.BlockNumber = "1010101" payload.BlockNumber = "1010101"
indexErr := repo.Index(&payload) err := repo.Index(&payload)
Expect(indexErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
num, retrieveErr := retriever.RetrieveLastBlockNumber() num, err := retriever.RetrieveLastBlockNumber()
Expect(retrieveErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(num).To(Equal(int64(1010101))) Expect(num).To(Equal(int64(1010101)))
}) })
@ -364,12 +366,12 @@ var _ = Describe("Retriever", func() {
payload1.BlockNumber = "1010101" payload1.BlockNumber = "1010101"
payload2 := payload1 payload2 := payload1
payload2.BlockNumber = "5" payload2.BlockNumber = "5"
indexErr := repo.Index(&payload1) err := repo.Index(&payload1)
Expect(indexErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
indexErr2 := repo.Index(&payload2) err = repo.Index(&payload2)
Expect(indexErr2).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
num, retrieveErr := retriever.RetrieveLastBlockNumber() num, err := retriever.RetrieveLastBlockNumber()
Expect(retrieveErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(num).To(Equal(int64(1010101))) Expect(num).To(Equal(int64(1010101)))
}) })
}) })
@ -380,24 +382,24 @@ var _ = Describe("Retriever", func() {
payload1.BlockNumber = "2" payload1.BlockNumber = "2"
payload2 := payload1 payload2 := payload1
payload2.BlockNumber = "3" payload2.BlockNumber = "3"
indexErr1 := repo.Index(mocks.MockCIDPayload) err := repo.Index(mocks.MockCIDPayload)
Expect(indexErr1).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
indexErr2 := repo.Index(&payload1) err = repo.Index(&payload1)
Expect(indexErr2).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
indexErr3 := repo.Index(&payload2) err = repo.Index(&payload2)
Expect(indexErr3).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
gaps, retrieveErr := retriever.RetrieveGapsInData() gaps, err := retriever.RetrieveGapsInData()
Expect(retrieveErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(0)) Expect(len(gaps)).To(Equal(0))
}) })
It("Doesn't return the gap from 0 to the earliest block", func() { It("Doesn't return the gap from 0 to the earliest block", func() {
payload := *mocks.MockCIDPayload payload := *mocks.MockCIDPayload
payload.BlockNumber = "5" payload.BlockNumber = "5"
indexErr := repo.Index(&payload) err := repo.Index(&payload)
Expect(indexErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
gaps, retrieveErr := retriever.RetrieveGapsInData() gaps, err := retriever.RetrieveGapsInData()
Expect(retrieveErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(0)) Expect(len(gaps)).To(Equal(0))
}) })
@ -406,12 +408,12 @@ var _ = Describe("Retriever", func() {
payload1.BlockNumber = "1010101" payload1.BlockNumber = "1010101"
payload2 := payload1 payload2 := payload1
payload2.BlockNumber = "5" payload2.BlockNumber = "5"
indexErr := repo.Index(&payload1) err := repo.Index(&payload1)
Expect(indexErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
indexErr2 := repo.Index(&payload2) err = repo.Index(&payload2)
Expect(indexErr2).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
gaps, retrieveErr := retriever.RetrieveGapsInData() gaps, err := retriever.RetrieveGapsInData()
Expect(retrieveErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(1)) Expect(len(gaps)).To(Equal(1))
Expect(gaps[0][0]).To(Equal(uint64(6))) Expect(gaps[0][0]).To(Equal(uint64(6)))
Expect(gaps[0][1]).To(Equal(uint64(1010100))) Expect(gaps[0][1]).To(Equal(uint64(1010100)))
@ -430,20 +432,20 @@ var _ = Describe("Retriever", func() {
payload5.BlockNumber = "102" payload5.BlockNumber = "102"
payload6 := payload5 payload6 := payload5
payload6.BlockNumber = "1000" payload6.BlockNumber = "1000"
indexErr := repo.Index(&payload1) err := repo.Index(&payload1)
Expect(indexErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
indexErr2 := repo.Index(&payload2) err = repo.Index(&payload2)
Expect(indexErr2).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
indexErr3 := repo.Index(&payload3) err = repo.Index(&payload3)
Expect(indexErr3).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
indexErr4 := repo.Index(&payload4) err = repo.Index(&payload4)
Expect(indexErr4).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
indexErr5 := repo.Index(&payload5) err = repo.Index(&payload5)
Expect(indexErr5).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
indexErr6 := repo.Index(&payload6) err = repo.Index(&payload6)
Expect(indexErr6).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
gaps, retrieveErr := retriever.RetrieveGapsInData() gaps, err := retriever.RetrieveGapsInData()
Expect(retrieveErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(gaps)).To(Equal(3)) Expect(len(gaps)).To(Equal(3))
Expect(super_node.ListContainsRange(gaps, [2]uint64{6, 99})).To(BeTrue()) Expect(super_node.ListContainsRange(gaps, [2]uint64{6, 99})).To(BeTrue())
Expect(super_node.ListContainsRange(gaps, [2]uint64{103, 999})).To(BeTrue()) Expect(super_node.ListContainsRange(gaps, [2]uint64{103, 999})).To(BeTrue())