index receipts by the contract address

This commit is contained in:
Ian Norden 2019-06-20 10:59:10 -05:00
parent 23a21c14f3
commit 3fa33fb767
10 changed files with 50 additions and 25 deletions

View File

@ -191,6 +191,7 @@ func subscriptionConfig() {
// Which means we get all receipts by default // Which means we get all receipts by default
ReceiptFilter: config.ReceiptFilter{ ReceiptFilter: config.ReceiptFilter{
Off: viper.GetBool("subscription.receiptFilter.off"), Off: viper.GetBool("subscription.receiptFilter.off"),
Contracts: viper.GetStringSlice("subscription.receiptFilter.contracts"),
Topic0s: viper.GetStringSlice("subscription.receiptFilter.topic0s"), Topic0s: viper.GetStringSlice("subscription.receiptFilter.topic0s"),
}, },

View File

@ -3,6 +3,7 @@ CREATE TABLE public.receipt_cids (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
tx_id INTEGER NOT NULL REFERENCES transaction_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, tx_id INTEGER NOT NULL REFERENCES transaction_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
cid TEXT NOT NULL, cid TEXT NOT NULL,
contract VARCHAR(66),
topic0s VARCHAR(66)[] topic0s VARCHAR(66)[]
); );

View File

@ -17,6 +17,7 @@
] ]
[subscription.receiptFilter] [subscription.receiptFilter]
off = false off = false
contracts = []
topic0s = [ topic0s = [
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377" "0x930a61a57a70a73c2a503615b87e2e54fe5b9cdeacda518270b852296ab1a377"
@ -29,12 +30,6 @@
intermediateNodes = false intermediateNodes = false
[subscription.storageFilter] [subscription.storageFilter]
off = true off = true
addresses = [ addresses = []
"", storageKeys = []
""
]
storageKeys = [
"",
""
]
intermediateNodes = false intermediateNodes = false

View File

@ -44,6 +44,7 @@ type TrxFilter struct {
type ReceiptFilter struct { type ReceiptFilter struct {
Off bool Off bool
Contracts []string
Topic0s []string Topic0s []string
} }

View File

@ -87,6 +87,7 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) {
// Extract topic0 data from the receipt's logs for indexing // Extract topic0 data from the receipt's logs for indexing
rctMeta := &ReceiptMetaData{ rctMeta := &ReceiptMetaData{
Topic0s: make([]string, 0, len(gethReceipt.Logs)), Topic0s: make([]string, 0, len(gethReceipt.Logs)),
ContractAddress: gethReceipt.ContractAddress.Hex(),
} }
for _, log := range gethReceipt.Logs { for _, log := range gethReceipt.Logs {
if len(log.Topics[0]) < 1 { if len(log.Topics[0]) < 1 {

View File

@ -170,6 +170,7 @@ func (pub *Publisher) publishReceipts(receipts types.Receipts, receiptMeta []*Re
if len(receiptsCids) != len(receipts) { if len(receiptsCids) != len(receipts) {
return nil, errors.New("expected one CID for each receipt") return nil, errors.New("expected one CID for each receipt")
} }
// Keep receipts associated with their transaction
mappedRctCids := make(map[common.Hash]*ReceiptMetaData, len(receiptsCids)) mappedRctCids := make(map[common.Hash]*ReceiptMetaData, len(receiptsCids))
for i, rct := range receipts { for i, rct := range receipts {
mappedRctCids[rct.TxHash] = receiptMeta[i] mappedRctCids[rct.TxHash] = receiptMeta[i]

View File

@ -106,8 +106,8 @@ func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CID
} }
func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ReceiptMetaData, txID int64) error { func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ReceiptMetaData, txID int64) error {
_, err := tx.Exec(`INSERT INTO public.receipt_cids (tx_id, cid, topic0s) VALUES ($1, $2, $3)`, _, err := tx.Exec(`INSERT INTO public.receipt_cids (tx_id, cid, contract, topic0s) VALUES ($1, $2, $3, $4)`,
txID, cidMeta.CID, pq.Array(cidMeta.Topic0s)) txID, cidMeta.CID, cidMeta.ContractAddress, pq.Array(cidMeta.Topic0s))
return err return err
} }

View File

@ -198,11 +198,16 @@ func (ecr *EthCIDRetriever) retrieveRctCIDs(tx *sqlx.Tx, streamFilters config.Su
AND header_cids.block_number = $1` AND header_cids.block_number = $1`
args = append(args, blockNumber) args = append(args, blockNumber)
if len(streamFilters.ReceiptFilter.Topic0s) > 0 { if len(streamFilters.ReceiptFilter.Topic0s) > 0 {
pgStr += ` AND (receipt_cids.topic0s && $2::VARCHAR(66)[]` pgStr += ` AND ((receipt_cids.topic0s && $2::VARCHAR(66)[]`
args = append(args, pq.Array(streamFilters.ReceiptFilter.Topic0s)) args = append(args, pq.Array(streamFilters.ReceiptFilter.Topic0s))
} }
if len(streamFilters.ReceiptFilter.Contracts) > 0 {
pgStr += ` AND receipt_cids.contract = ANY($3::VARCHAR(66)[])`
} else {
pgStr += `)`
}
if len(trxIds) > 0 { if len(trxIds) > 0 {
pgStr += ` OR receipt_cids.tx_id = ANY($3::INTEGER[]))` pgStr += ` OR receipt_cids.tx_id = ANY($4::INTEGER[]))`
args = append(args, pq.Array(trxIds)) args = append(args, pq.Array(trxIds))
} else { } else {
pgStr += `)` pgStr += `)`

View File

@ -128,7 +128,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin
func (s *Screener) filerReceipts(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error { func (s *Screener) filerReceipts(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error {
if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
for i, receipt := range payload.Receipts { for i, receipt := range payload.Receipts {
if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, trxHashes) { if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, streamFilters.ReceiptFilter.Contracts, payload.ReceiptMetaData[i].ContractAddress, trxHashes) {
receiptForStorage := (*types.ReceiptForStorage)(receipt) receiptForStorage := (*types.ReceiptForStorage)(receipt)
receiptBuffer := new(bytes.Buffer) receiptBuffer := new(bytes.Buffer)
err := receiptForStorage.EncodeRLP(receiptBuffer) err := receiptForStorage.EncodeRLP(receiptBuffer)
@ -142,16 +142,20 @@ func (s *Screener) filerReceipts(streamFilters config.Subscription, response *Re
return nil return nil
} }
func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics []string, wantedTrxHashes []common.Hash) bool { func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics, wantedContracts []string, actualContract string, wantedTrxHashes []common.Hash) bool {
// If we aren't filtering for any topics, all topics are a go // If we aren't filtering for any topics or contracts, all topics are a go
if len(wantedTopics) == 0 { if len(wantedTopics) == 0 && len(wantedContracts) == 0 {
return true return true
} }
// No matter what filters we have, we keep receipts for the trxs we are interested in
for _, wantedTrxHash := range wantedTrxHashes { for _, wantedTrxHash := range wantedTrxHashes {
if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) { if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) {
return true return true
} }
} }
if len(wantedContracts) == 0 {
// We keep all receipts that have logs we are interested in
for _, wantedTopic := range wantedTopics { for _, wantedTopic := range wantedTopics {
for _, actualTopic := range actualTopics { for _, actualTopic := range actualTopics {
if wantedTopic == actualTopic { if wantedTopic == actualTopic {
@ -159,6 +163,21 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics []string, want
} }
} }
} }
} else {
// We only keep receipts with logs of interest if we are interested in that contract
for _, wantedTopic := range wantedTopics {
for _, actualTopic := range actualTopics {
if wantedTopic == actualTopic {
for _, wantedContract := range wantedContracts {
if wantedContract == actualContract {
return true
}
}
}
}
}
}
return false return false
} }

View File

@ -143,6 +143,7 @@ type StorageNodeCID struct {
type ReceiptMetaData struct { type ReceiptMetaData struct {
CID string CID string
Topic0s []string Topic0s []string
ContractAddress string
} }
// TrxMetaData wraps some additional data around our transaction CID for indexing // TrxMetaData wraps some additional data around our transaction CID for indexing