diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go index 140952a0..b7bd6273 100644 --- a/cmd/streamSubscribe.go +++ b/cmd/streamSubscribe.go @@ -190,8 +190,9 @@ func subscriptionConfig() { // Below defaults to false and one slice of length 0 // Which means we get all receipts by default ReceiptFilter: config.ReceiptFilter{ - Off: viper.GetBool("subscription.receiptFilter.off"), - Topic0s: viper.GetStringSlice("subscription.receiptFilter.topic0s"), + Off: viper.GetBool("subscription.receiptFilter.off"), + Contracts: viper.GetStringSlice("subscription.receiptFilter.contracts"), + Topic0s: viper.GetStringSlice("subscription.receiptFilter.topic0s"), }, // Below defaults to two false, and a slice of length 0 diff --git a/db/migrations/00030_create_receipt_cids_table.sql b/db/migrations/00030_create_receipt_cids_table.sql index 29372955..f53ab730 100644 --- a/db/migrations/00030_create_receipt_cids_table.sql +++ b/db/migrations/00030_create_receipt_cids_table.sql @@ -3,6 +3,7 @@ CREATE TABLE public.receipt_cids ( id SERIAL PRIMARY KEY, tx_id INTEGER NOT NULL REFERENCES transaction_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, cid TEXT NOT NULL, + contract VARCHAR(66), topic0s VARCHAR(66)[] ); diff --git a/environments/seedNodeSubscription.toml b/environments/seedNodeSubscription.toml index ec208ebd..3f899939 100644 --- a/environments/seedNodeSubscription.toml +++ b/environments/seedNodeSubscription.toml @@ -17,6 +17,7 @@ ] [subscription.receiptFilter] off = false + contracts = [] topic0s = [ "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377" @@ -29,12 +30,6 @@ intermediateNodes = false [subscription.storageFilter] off = true - addresses = [ - "", - "" - ] - storageKeys = [ - "", - "" - ] + addresses = [] + storageKeys = [] intermediateNodes = false \ No newline at end of file diff --git a/pkg/config/subscription.go b/pkg/config/subscription.go index c304b066..0efb71bb 100644 --- a/pkg/config/subscription.go +++ b/pkg/config/subscription.go @@ -44,6 +44,7 @@ type TrxFilter struct { type ReceiptFilter struct { Off bool + Contracts []string Topic0s []string } diff --git a/pkg/ipfs/converter.go b/pkg/ipfs/converter.go index 82609795..179d5b2a 100644 --- a/pkg/ipfs/converter.go +++ b/pkg/ipfs/converter.go @@ -86,7 +86,8 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { } // Extract topic0 data from the receipt's logs for indexing rctMeta := &ReceiptMetaData{ - Topic0s: make([]string, 0, len(gethReceipt.Logs)), + Topic0s: make([]string, 0, len(gethReceipt.Logs)), + ContractAddress: gethReceipt.ContractAddress.Hex(), } for _, log := range gethReceipt.Logs { if len(log.Topics[0]) < 1 { diff --git a/pkg/ipfs/publisher.go b/pkg/ipfs/publisher.go index 7ec63828..4d0158ef 100644 --- a/pkg/ipfs/publisher.go +++ b/pkg/ipfs/publisher.go @@ -170,6 +170,7 @@ func (pub *Publisher) publishReceipts(receipts types.Receipts, receiptMeta []*Re if len(receiptsCids) != len(receipts) { return nil, errors.New("expected one CID for each receipt") } + // Keep receipts associated with their transaction mappedRctCids := make(map[common.Hash]*ReceiptMetaData, len(receiptsCids)) for i, rct := range receipts { mappedRctCids[rct.TxHash] = receiptMeta[i] diff --git a/pkg/ipfs/repository.go b/pkg/ipfs/repository.go index 320a3860..317e8d78 100644 --- a/pkg/ipfs/repository.go +++ b/pkg/ipfs/repository.go @@ -106,8 +106,8 @@ func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CID } func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ReceiptMetaData, txID int64) error { - _, err := tx.Exec(`INSERT INTO public.receipt_cids (tx_id, cid, topic0s) VALUES ($1, $2, $3)`, - txID, cidMeta.CID, pq.Array(cidMeta.Topic0s)) + _, err := tx.Exec(`INSERT INTO public.receipt_cids (tx_id, cid, contract, topic0s) VALUES ($1, $2, $3, $4)`, + txID, cidMeta.CID, cidMeta.ContractAddress, pq.Array(cidMeta.Topic0s)) return err } diff --git a/pkg/ipfs/retreiver.go b/pkg/ipfs/retreiver.go index 06394a0b..91b4b1df 100644 --- a/pkg/ipfs/retreiver.go +++ b/pkg/ipfs/retreiver.go @@ -198,11 +198,16 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su AND header_cids.block_number = $1` args = append(args, blockNumber) if len(streamFilters.ReceiptFilter.Topic0s) > 0 { - pgStr += ` AND (receipt_cids.topic0s && $2::VARCHAR(66)[]` + pgStr += ` AND ((receipt_cids.topic0s && $2::VARCHAR(66)[]` args = append(args, pq.Array(streamFilters.ReceiptFilter.Topic0s)) } + if len(streamFilters.ReceiptFilter.Contracts) > 0 { + pgStr += ` AND receipt_cids.contract = ANY($3::VARCHAR(66)[])` + } else { + pgStr += `)` + } if len(trxIds) > 0 { - pgStr += ` OR receipt_cids.tx_id = ANY($3::INTEGER[]))` + pgStr += ` OR receipt_cids.tx_id = ANY($4::INTEGER[]))` args = append(args, pq.Array(trxIds)) } else { pgStr += `)` diff --git a/pkg/ipfs/screener.go b/pkg/ipfs/screener.go index 0ddf027e..f59a55c4 100644 --- a/pkg/ipfs/screener.go +++ b/pkg/ipfs/screener.go @@ -128,7 +128,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin func (s *Screener) filerReceipts(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error { if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { for i, receipt := range payload.Receipts { - if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, trxHashes) { + if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, streamFilters.ReceiptFilter.Contracts, payload.ReceiptMetaData[i].ContractAddress, trxHashes) { receiptForStorage := (*types.ReceiptForStorage)(receipt) receiptBuffer := new(bytes.Buffer) err := receiptForStorage.EncodeRLP(receiptBuffer) @@ -142,23 +142,42 @@ func (s *Screener) filerReceipts(streamFilters config.Subscription, response *Re return nil } -func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics []string, wantedTrxHashes []common.Hash) bool { - // If we aren't filtering for any topics, all topics are a go - if len(wantedTopics) == 0 { +func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContracts []string, actualContract string, wantedTrxHashes []common.Hash) bool { + // If we aren't filtering for any topics or contracts, all topics are a go + if len(wantedTopics) == 0 && len(wantedContracts) == 0 { return true } + // No matter what filters we have, we keep receipts for the trxs we are interested in for _, wantedTrxHash := range wantedTrxHashes { if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) { return true } } - for _, wantedTopic := range wantedTopics { - for _, actualTopic := range actualTopics { - if wantedTopic == actualTopic { - return true + + if len(wantedContracts) == 0 { + // We keep all receipts that have logs we are interested in + for _, wantedTopic := range wantedTopics { + for _, actualTopic := range actualTopics { + if wantedTopic == actualTopic { + return true + } + } + } + } else { + // We only keep receipts with logs of interest if we are interested in that contract + for _, wantedTopic := range wantedTopics { + for _, actualTopic := range actualTopics { + if wantedTopic == actualTopic { + for _, wantedContract := range wantedContracts { + if wantedContract == actualContract { + return true + } + } + } } } } + return false } diff --git a/pkg/ipfs/types.go b/pkg/ipfs/types.go index 522241b7..b1b43dec 100644 --- a/pkg/ipfs/types.go +++ b/pkg/ipfs/types.go @@ -141,8 +141,9 @@ type StorageNodeCID struct { // ReceiptMetaData wraps some additional data around our receipt CIDs for indexing type ReceiptMetaData struct { - CID string - Topic0s []string + CID string + Topic0s []string + ContractAddress string } // TrxMetaData wraps some additional data around our transaction CID for indexing