Clean up #231
172
cmd/subscribe.go
172
cmd/subscribe.go
@ -1,172 +0,0 @@
|
|||||||
// Copyright © 2019 Vulcanize, Inc
|
|
||||||
//
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
//
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
//
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package cmd
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/log"
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
|
||||||
"github.com/spf13/cobra"
|
|
||||||
"github.com/spf13/viper"
|
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/client"
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/eth"
|
|
||||||
w "github.com/cerc-io/ipld-eth-server/v4/pkg/serve"
|
|
||||||
)
|
|
||||||
|
|
||||||
// subscribeCmd represents the subscribe command
|
|
||||||
var subscribeCmd = &cobra.Command{
|
|
||||||
Use: "subscribe",
|
|
||||||
Short: "This command is used to subscribe to the eth ipfs watcher data stream with the provided filters",
|
|
||||||
Long: `This command is for demo and testing purposes and is used to subscribe to the watcher with the provided subscription configuration parameters.
|
|
||||||
It does not do anything with the data streamed from the watcher other than unpack it and print it out for demonstration purposes.`,
|
|
||||||
Run: func(cmd *cobra.Command, args []string) {
|
|
||||||
subCommand = cmd.CalledAs()
|
|
||||||
logWithCommand = *log.WithField("SubCommand", subCommand)
|
|
||||||
subscribe()
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
rootCmd.AddCommand(subscribeCmd)
|
|
||||||
}
|
|
||||||
|
|
||||||
func subscribe() {
|
|
||||||
// Prep the subscription config/filters to be sent to the server
|
|
||||||
ethSubConfig, err := eth.NewEthSubscriptionConfig()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a new rpc client and a subscription streamer with that client
|
|
||||||
rpcClient, err := getRPCClient()
|
|
||||||
if err != nil {
|
|
||||||
logWithCommand.Fatal(err)
|
|
||||||
}
|
|
||||||
subClient := client.NewClient(rpcClient)
|
|
||||||
|
|
||||||
// Buffered channel for reading subscription payloads
|
|
||||||
payloadChan := make(chan w.SubscriptionPayload, 20000)
|
|
||||||
|
|
||||||
// Subscribe to the watcher service with the given config/filter parameters
|
|
||||||
sub, err := subClient.Stream(payloadChan, *ethSubConfig)
|
|
||||||
if err != nil {
|
|
||||||
logWithCommand.Fatal(err)
|
|
||||||
}
|
|
||||||
logWithCommand.Info("awaiting payloads")
|
|
||||||
// Receive response payloads and print out the results
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case payload := <-payloadChan:
|
|
||||||
if payload.Err != "" {
|
|
||||||
logWithCommand.Error(payload.Err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var ethData eth.IPLDs
|
|
||||||
if err := rlp.DecodeBytes(payload.Data, ðData); err != nil {
|
|
||||||
logWithCommand.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var header types.Header
|
|
||||||
err = rlp.Decode(bytes.NewBuffer(ethData.Header.Data), &header)
|
|
||||||
if err != nil {
|
|
||||||
logWithCommand.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fmt.Printf("Header number %d, hash %s\n", header.Number.Int64(), header.Hash().Hex())
|
|
||||||
fmt.Printf("header: %v\n", header)
|
|
||||||
for _, trxRlp := range ethData.Transactions {
|
|
||||||
var trx types.Transaction
|
|
||||||
buff := bytes.NewBuffer(trxRlp.Data)
|
|
||||||
stream := rlp.NewStream(buff, 0)
|
|
||||||
err := trx.DecodeRLP(stream)
|
|
||||||
if err != nil {
|
|
||||||
logWithCommand.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fmt.Printf("Transaction with hash %s\n", trx.Hash().Hex())
|
|
||||||
fmt.Printf("trx: %v\n", trx)
|
|
||||||
}
|
|
||||||
for _, rctRlp := range ethData.Receipts {
|
|
||||||
var rct types.Receipt
|
|
||||||
buff := bytes.NewBuffer(rctRlp.Data)
|
|
||||||
stream := rlp.NewStream(buff, 0)
|
|
||||||
err = rct.DecodeRLP(stream)
|
|
||||||
if err != nil {
|
|
||||||
logWithCommand.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fmt.Printf("Receipt with block hash %s, trx hash %s\n", rct.BlockHash.Hex(), rct.TxHash.Hex())
|
|
||||||
fmt.Printf("rct: %v\n", rct)
|
|
||||||
for _, l := range rct.Logs {
|
|
||||||
if len(l.Topics) < 1 {
|
|
||||||
logWithCommand.Error(fmt.Sprintf("log only has %d topics", len(l.Topics)))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fmt.Printf("Log for block hash %s, trx hash %s, address %s, and with topic0 %s\n",
|
|
||||||
l.BlockHash.Hex(), l.TxHash.Hex(), l.Address.Hex(), l.Topics[0].Hex())
|
|
||||||
fmt.Printf("log: %v\n", l)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// This assumes leafs only
|
|
||||||
for _, stateNode := range ethData.StateNodes {
|
|
||||||
var acct types.StateAccount
|
|
||||||
err = rlp.DecodeBytes(stateNode.IPLD.Data, &acct)
|
|
||||||
if err != nil {
|
|
||||||
logWithCommand.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fmt.Printf("Account for key %s, and root %s, with balance %s\n",
|
|
||||||
stateNode.StateLeafKey.Hex(), acct.Root.Hex(), acct.Balance.String())
|
|
||||||
fmt.Printf("state account: %+v\n", acct)
|
|
||||||
}
|
|
||||||
for _, storageNode := range ethData.StorageNodes {
|
|
||||||
fmt.Printf("Storage for state key %s ", storageNode.StateLeafKey.Hex())
|
|
||||||
fmt.Printf("with storage key %s\n", storageNode.StorageLeafKey.Hex())
|
|
||||||
var i []interface{}
|
|
||||||
err := rlp.DecodeBytes(storageNode.IPLD.Data, &i)
|
|
||||||
if err != nil {
|
|
||||||
logWithCommand.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// if a value node
|
|
||||||
if len(i) == 1 {
|
|
||||||
valueBytes, ok := i[0].([]byte)
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
fmt.Printf("Storage leaf key: %s, and value hash: %s\n",
|
|
||||||
storageNode.StorageLeafKey.Hex(), common.BytesToHash(valueBytes).Hex())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case err = <-sub.Err():
|
|
||||||
logWithCommand.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getRPCClient() (*rpc.Client, error) {
|
|
||||||
vulcPath := viper.GetString("watcher.ethSubscription.wsPath")
|
|
||||||
if vulcPath == "" {
|
|
||||||
vulcPath = "ws://127.0.0.1:8080" // default to and try the default ws url if no path is provided
|
|
||||||
}
|
|
||||||
return rpc.Dial(vulcPath)
|
|
||||||
}
|
|
@ -1,44 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2019 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
// Client is used by watchers to stream chain IPLD data from a vulcanizedb ipld-eth-server
|
|
||||||
package client
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/eth"
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/serve"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Client is used to subscribe to the ipld-eth-server ipld data stream
|
|
||||||
type Client struct {
|
|
||||||
c *rpc.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewClient creates a new Client
|
|
||||||
func NewClient(c *rpc.Client) *Client {
|
|
||||||
return &Client{
|
|
||||||
c: c,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stream is the main loop for subscribing to iplds from an ipld-eth-server server
|
|
||||||
func (c *Client) Stream(payloadChan chan serve.SubscriptionPayload, params eth.SubscriptionSettings) (*rpc.ClientSubscription, error) {
|
|
||||||
return c.c.Subscribe(context.Background(), "vdb", payloadChan, "stream", params)
|
|
||||||
}
|
|
@ -1,366 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2019 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package eth
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
|
||||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
"github.com/multiformats/go-multihash"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Filterer interface for substituing mocks in tests
|
|
||||||
type Filterer interface {
|
|
||||||
Filter(filter SubscriptionSettings, payload ConvertedPayload) (*IPLDs, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResponseFilterer satisfies the ResponseFilterer interface for ethereum
|
|
||||||
type ResponseFilterer struct{}
|
|
||||||
|
|
||||||
// NewResponseFilterer creates a new Filterer satisfying the ResponseFilterer interface
|
|
||||||
func NewResponseFilterer() *ResponseFilterer {
|
|
||||||
return &ResponseFilterer{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Filter is used to filter through eth data to extract and package requested data into a Payload
|
|
||||||
func (s *ResponseFilterer) Filter(filter SubscriptionSettings, payload ConvertedPayload) (*IPLDs, error) {
|
|
||||||
if checkRange(filter.Start.Int64(), filter.End.Int64(), payload.Block.Number().Int64()) {
|
|
||||||
response := new(IPLDs)
|
|
||||||
response.TotalDifficulty = payload.TotalDifficulty
|
|
||||||
if err := s.filterHeaders(filter.HeaderFilter, response, payload); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
txHashes, err := s.filterTransactions(filter.TxFilter, response, payload)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var filterTxs []common.Hash
|
|
||||||
if filter.ReceiptFilter.MatchTxs {
|
|
||||||
filterTxs = txHashes
|
|
||||||
}
|
|
||||||
if err := s.filerReceipts(filter.ReceiptFilter, response, payload, filterTxs); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := s.filterStateAndStorage(filter.StateFilter, filter.StorageFilter, response, payload); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
response.BlockNumber = payload.Block.Number()
|
|
||||||
return response, nil
|
|
||||||
}
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload ConvertedPayload) error {
|
|
||||||
if !headerFilter.Off {
|
|
||||||
headerRLP, err := rlp.EncodeToBytes(payload.Block.Header())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
cid, err := ipld.RawdataToCid(ipld.MEthHeader, headerRLP, multihash.KECCAK_256)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
response.Header = models.IPLDModel{
|
|
||||||
BlockNumber: payload.Block.Number().String(),
|
|
||||||
Data: headerRLP,
|
|
||||||
Key: cid.String(),
|
|
||||||
}
|
|
||||||
if headerFilter.Uncles {
|
|
||||||
response.Uncles = make([]models.IPLDModel, len(payload.Block.Body().Uncles))
|
|
||||||
for i, uncle := range payload.Block.Body().Uncles {
|
|
||||||
uncleRlp, err := rlp.EncodeToBytes(uncle)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
cid, err := ipld.RawdataToCid(ipld.MEthHeader, uncleRlp, multihash.KECCAK_256)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
response.Uncles[i] = models.IPLDModel{
|
|
||||||
BlockNumber: uncle.Number.String(),
|
|
||||||
Data: uncleRlp,
|
|
||||||
Key: cid.String(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkRange(start, end, actual int64) bool {
|
|
||||||
if (end <= 0 || end >= actual) && start <= actual {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload ConvertedPayload) ([]common.Hash, error) {
|
|
||||||
var trxHashes []common.Hash
|
|
||||||
if !trxFilter.Off {
|
|
||||||
trxLen := len(payload.Block.Body().Transactions)
|
|
||||||
trxHashes = make([]common.Hash, 0, trxLen)
|
|
||||||
response.Transactions = make([]models.IPLDModel, 0, trxLen)
|
|
||||||
for i, trx := range payload.Block.Body().Transactions {
|
|
||||||
// TODO: check if want corresponding receipt and if we do we must include this transaction
|
|
||||||
if checkTransactionAddrs(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) {
|
|
||||||
trxBuffer := new(bytes.Buffer)
|
|
||||||
if err := trx.EncodeRLP(trxBuffer); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
data := trxBuffer.Bytes()
|
|
||||||
cid, err := ipld.RawdataToCid(ipld.MEthTx, data, multihash.KECCAK_256)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
response.Transactions = append(response.Transactions, models.IPLDModel{
|
|
||||||
Data: data,
|
|
||||||
Key: cid.String(),
|
|
||||||
})
|
|
||||||
trxHashes = append(trxHashes, trx.Hash())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return trxHashes, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// checkTransactionAddrs returns true if either the transaction src and dst are one of the wanted src and dst addresses
|
|
||||||
func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst string) bool {
|
|
||||||
// If we aren't filtering for any addresses, every transaction is a go
|
|
||||||
if len(wantedDst) == 0 && len(wantedSrc) == 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
for _, src := range wantedSrc {
|
|
||||||
if src == actualSrc {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, dst := range wantedDst {
|
|
||||||
if dst == actualDst {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload ConvertedPayload, trxHashes []common.Hash) error {
|
|
||||||
if !receiptFilter.Off {
|
|
||||||
response.Receipts = make([]models.IPLDModel, 0, len(payload.Receipts))
|
|
||||||
rctLeafCID, rctIPLDData, err := GetRctLeafNodeData(payload.Receipts)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for idx, receipt := range payload.Receipts {
|
|
||||||
// topics is always length 4
|
|
||||||
topics := make([][]string, 4)
|
|
||||||
contracts := make([]string, len(receipt.Logs))
|
|
||||||
for _, l := range receipt.Logs {
|
|
||||||
contracts = append(contracts, l.Address.String())
|
|
||||||
for idx, t := range l.Topics {
|
|
||||||
topics[idx] = append(topics[idx], t.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Verify this filter logic.
|
|
||||||
if checkReceipts(receipt, receiptFilter.Topics, topics, receiptFilter.LogAddresses, contracts, trxHashes) {
|
|
||||||
response.Receipts = append(response.Receipts, models.IPLDModel{
|
|
||||||
BlockNumber: payload.Block.Number().String(),
|
|
||||||
Data: rctIPLDData[idx],
|
|
||||||
Key: rctLeafCID[idx].String(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics [][]string, wantedAddresses []string, actualAddresses []string, wantedTrxHashes []common.Hash) bool {
|
|
||||||
// If we aren't filtering for any topics, contracts, or corresponding trxs then all receipts are a go
|
|
||||||
if len(wantedTopics) == 0 && len(wantedAddresses) == 0 && len(wantedTrxHashes) == 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
// Keep receipts that are from watched txs
|
|
||||||
for _, wantedTrxHash := range wantedTrxHashes {
|
|
||||||
if bytes.Equal(wantedTrxHash.Bytes(), rct.TxHash.Bytes()) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// If there are no wanted contract addresses, we keep all receipts that match the topic filter
|
|
||||||
if len(wantedAddresses) == 0 {
|
|
||||||
if match := filterMatch(wantedTopics, actualTopics); match {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// If there are wanted contract addresses to filter on
|
|
||||||
for _, wantedAddr := range wantedAddresses {
|
|
||||||
// and this is an address of interest
|
|
||||||
for _, actualAddr := range actualAddresses {
|
|
||||||
if wantedAddr == actualAddr {
|
|
||||||
// we keep the receipt if it matches on the topic filter
|
|
||||||
if match := filterMatch(wantedTopics, actualTopics); match {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// filterMatch returns true if the actualTopics conform to the wantedTopics filter
|
|
||||||
func filterMatch(wantedTopics, actualTopics [][]string) bool {
|
|
||||||
// actualTopics should always be length 4, but the members can be nil slices
|
|
||||||
matches := 0
|
|
||||||
for i, actualTopicSet := range actualTopics {
|
|
||||||
if i < len(wantedTopics) && len(wantedTopics[i]) > 0 {
|
|
||||||
// If we have topics in this filter slot, count as a match if one of the topics matches
|
|
||||||
matches += slicesShareString(actualTopicSet, wantedTopics[i])
|
|
||||||
} else {
|
|
||||||
// Filter slot is either empty or doesn't exist => not matching any topics at this slot => counts as a match
|
|
||||||
matches++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return matches == 4
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns 1 if the two slices have a string in common, 0 if they do not
|
|
||||||
func slicesShareString(slice1, slice2 []string) int {
|
|
||||||
for _, str1 := range slice1 {
|
|
||||||
for _, str2 := range slice2 {
|
|
||||||
if str1 == str2 {
|
|
||||||
return 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// filterStateAndStorage filters state and storage nodes into the response according to the provided filters
|
|
||||||
func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storageFilter StorageFilter, response *IPLDs, payload ConvertedPayload) error {
|
|
||||||
response.StateNodes = make([]StateNode, 0, len(payload.StateNodes))
|
|
||||||
response.StorageNodes = make([]StorageNode, 0)
|
|
||||||
stateAddressFilters := make([]common.Hash, len(stateFilter.Addresses))
|
|
||||||
for i, addr := range stateFilter.Addresses {
|
|
||||||
stateAddressFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes())
|
|
||||||
}
|
|
||||||
storageAddressFilters := make([]common.Hash, len(storageFilter.Addresses))
|
|
||||||
for i, addr := range storageFilter.Addresses {
|
|
||||||
storageAddressFilters[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes())
|
|
||||||
}
|
|
||||||
storageKeyFilters := make([]common.Hash, len(storageFilter.StorageKeys))
|
|
||||||
for i, store := range storageFilter.StorageKeys {
|
|
||||||
storageKeyFilters[i] = common.HexToHash(store)
|
|
||||||
}
|
|
||||||
for _, stateNode := range payload.StateNodes {
|
|
||||||
if !stateFilter.Off && checkNodeKeys(stateAddressFilters, stateNode.LeafKey) {
|
|
||||||
if stateNode.NodeType == sdtypes.Leaf || stateFilter.IntermediateNodes {
|
|
||||||
cid, err := ipld.RawdataToCid(ipld.MEthStateTrie, stateNode.NodeValue, multihash.KECCAK_256)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
response.StateNodes = append(response.StateNodes, StateNode{
|
|
||||||
StateLeafKey: common.BytesToHash(stateNode.LeafKey),
|
|
||||||
Path: stateNode.Path,
|
|
||||||
IPLD: models.IPLDModel{
|
|
||||||
BlockNumber: payload.Block.Number().String(),
|
|
||||||
Data: stateNode.NodeValue,
|
|
||||||
Key: cid.String(),
|
|
||||||
},
|
|
||||||
Type: stateNode.NodeType,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !storageFilter.Off && checkNodeKeys(storageAddressFilters, stateNode.LeafKey) {
|
|
||||||
for _, storageNode := range payload.StorageNodes[common.Bytes2Hex(stateNode.Path)] {
|
|
||||||
if checkNodeKeys(storageKeyFilters, storageNode.LeafKey) {
|
|
||||||
cid, err := ipld.RawdataToCid(ipld.MEthStorageTrie, storageNode.NodeValue, multihash.KECCAK_256)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
response.StorageNodes = append(response.StorageNodes, StorageNode{
|
|
||||||
StateLeafKey: common.BytesToHash(stateNode.LeafKey),
|
|
||||||
StorageLeafKey: common.BytesToHash(storageNode.LeafKey),
|
|
||||||
IPLD: models.IPLDModel{
|
|
||||||
BlockNumber: payload.Block.Number().String(),
|
|
||||||
Data: storageNode.NodeValue,
|
|
||||||
Key: cid.String(),
|
|
||||||
},
|
|
||||||
Type: storageNode.NodeType,
|
|
||||||
Path: storageNode.Path,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkNodeKeys(wantedKeys []common.Hash, actualKey []byte) bool {
|
|
||||||
// If we aren't filtering for any specific keys, all nodes are a go
|
|
||||||
if len(wantedKeys) == 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
for _, key := range wantedKeys {
|
|
||||||
if bytes.Equal(key.Bytes(), actualKey) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetRctLeafNodeData converts the receipts to receipt trie and returns the receipt leaf node IPLD data and
|
|
||||||
// corresponding CIDs
|
|
||||||
func GetRctLeafNodeData(rcts types.Receipts) ([]cid.Cid, [][]byte, error) {
|
|
||||||
receiptTrie := ipld.NewRctTrie()
|
|
||||||
for idx, rct := range rcts {
|
|
||||||
ethRct, err := ipld.NewReceipt(rct)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
if err = receiptTrie.Add(idx, ethRct.RawData()); err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rctLeafNodes, keys, err := receiptTrie.GetLeafNodes()
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ethRctleafNodeCids := make([]cid.Cid, len(rctLeafNodes))
|
|
||||||
ethRctleafNodeData := make([][]byte, len(rctLeafNodes))
|
|
||||||
for i, rln := range rctLeafNodes {
|
|
||||||
var idx uint
|
|
||||||
|
|
||||||
r := bytes.NewReader(keys[i].TrieKey)
|
|
||||||
err = rlp.Decode(r, &idx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ethRctleafNodeCids[idx] = rln.Cid()
|
|
||||||
ethRctleafNodeData[idx] = rln.RawData()
|
|
||||||
}
|
|
||||||
|
|
||||||
return ethRctleafNodeCids, ethRctleafNodeData, nil
|
|
||||||
}
|
|
@ -1,208 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2019 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package eth_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
|
||||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
|
||||||
. "github.com/onsi/gomega"
|
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/eth"
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/eth/test_helpers"
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/shared"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
filterer *eth.ResponseFilterer
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ = Describe("Filterer", func() {
|
|
||||||
Describe("FilterResponse", func() {
|
|
||||||
BeforeEach(func() {
|
|
||||||
filterer = eth.NewResponseFilterer()
|
|
||||||
})
|
|
||||||
|
|
||||||
It("Transcribes all the data from the IPLDPayload into the StreamPayload if given an open filter", func() {
|
|
||||||
iplds, err := filterer.Filter(openFilter, test_helpers.MockConvertedPayload)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(iplds).ToNot(BeNil())
|
|
||||||
Expect(iplds.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64()))
|
|
||||||
Expect(iplds.Header).To(Equal(test_helpers.MockIPLDs.Header))
|
|
||||||
var expectedEmptyUncles []models.IPLDModel
|
|
||||||
Expect(iplds.Uncles).To(Equal(expectedEmptyUncles))
|
|
||||||
Expect(len(iplds.Transactions)).To(Equal(4))
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds.Transactions, test_helpers.Tx1)).To(BeTrue())
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds.Transactions, test_helpers.Tx2)).To(BeTrue())
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds.Transactions, test_helpers.Tx3)).To(BeTrue())
|
|
||||||
Expect(len(iplds.Receipts)).To(Equal(4))
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds.Receipts, test_helpers.Rct1IPLD)).To(BeTrue())
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds.Receipts, test_helpers.Rct2IPLD)).To(BeTrue())
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds.Receipts, test_helpers.Rct3IPLD)).To(BeTrue())
|
|
||||||
Expect(len(iplds.StateNodes)).To(Equal(2))
|
|
||||||
for _, stateNode := range iplds.StateNodes {
|
|
||||||
Expect(stateNode.Type).To(Equal(sdtypes.Leaf))
|
|
||||||
if bytes.Equal(stateNode.StateLeafKey.Bytes(), test_helpers.AccountLeafKey) {
|
|
||||||
Expect(stateNode.IPLD).To(Equal(models.IPLDModel{
|
|
||||||
BlockNumber: test_helpers.BlockNumber.String(),
|
|
||||||
Data: test_helpers.State2IPLD.RawData(),
|
|
||||||
Key: test_helpers.State2IPLD.Cid().String(),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
if bytes.Equal(stateNode.StateLeafKey.Bytes(), test_helpers.ContractLeafKey) {
|
|
||||||
Expect(stateNode.IPLD).To(Equal(models.IPLDModel{
|
|
||||||
BlockNumber: test_helpers.BlockNumber.String(),
|
|
||||||
Data: test_helpers.State1IPLD.RawData(),
|
|
||||||
Key: test_helpers.State1IPLD.Cid().String(),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Expect(iplds.StorageNodes).To(Equal(test_helpers.MockIPLDs.StorageNodes))
|
|
||||||
})
|
|
||||||
|
|
||||||
It("Applies filters from the provided config.Subscription", func() {
|
|
||||||
iplds1, err := filterer.Filter(rctAddressFilter, test_helpers.MockConvertedPayload)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(iplds1).ToNot(BeNil())
|
|
||||||
Expect(iplds1.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64()))
|
|
||||||
Expect(iplds1.Header).To(Equal(models.IPLDModel{}))
|
|
||||||
Expect(len(iplds1.Uncles)).To(Equal(0))
|
|
||||||
Expect(len(iplds1.Transactions)).To(Equal(0))
|
|
||||||
Expect(len(iplds1.StorageNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds1.StateNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds1.Receipts)).To(Equal(1))
|
|
||||||
Expect(iplds1.Receipts[0]).To(Equal(models.IPLDModel{
|
|
||||||
BlockNumber: test_helpers.BlockNumber.String(),
|
|
||||||
Data: test_helpers.Rct1IPLD,
|
|
||||||
Key: test_helpers.Rct1CID.String(),
|
|
||||||
}))
|
|
||||||
|
|
||||||
iplds2, err := filterer.Filter(rctTopicsFilter, test_helpers.MockConvertedPayload)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(iplds2).ToNot(BeNil())
|
|
||||||
Expect(iplds2.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64()))
|
|
||||||
Expect(iplds2.Header).To(Equal(models.IPLDModel{}))
|
|
||||||
Expect(len(iplds2.Uncles)).To(Equal(0))
|
|
||||||
Expect(len(iplds2.Transactions)).To(Equal(0))
|
|
||||||
Expect(len(iplds2.StorageNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds2.StateNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds2.Receipts)).To(Equal(1))
|
|
||||||
Expect(iplds2.Receipts[0]).To(Equal(models.IPLDModel{
|
|
||||||
BlockNumber: test_helpers.BlockNumber.String(),
|
|
||||||
Data: test_helpers.Rct1IPLD,
|
|
||||||
Key: test_helpers.Rct1CID.String(),
|
|
||||||
}))
|
|
||||||
|
|
||||||
iplds3, err := filterer.Filter(rctTopicsAndAddressFilter, test_helpers.MockConvertedPayload)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(iplds3).ToNot(BeNil())
|
|
||||||
Expect(iplds3.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64()))
|
|
||||||
Expect(iplds3.Header).To(Equal(models.IPLDModel{}))
|
|
||||||
Expect(len(iplds3.Uncles)).To(Equal(0))
|
|
||||||
Expect(len(iplds3.Transactions)).To(Equal(0))
|
|
||||||
Expect(len(iplds3.StorageNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds3.StateNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds3.Receipts)).To(Equal(1))
|
|
||||||
Expect(iplds3.Receipts[0]).To(Equal(models.IPLDModel{
|
|
||||||
BlockNumber: test_helpers.BlockNumber.String(),
|
|
||||||
Data: test_helpers.Rct1IPLD,
|
|
||||||
Key: test_helpers.Rct1CID.String(),
|
|
||||||
}))
|
|
||||||
|
|
||||||
iplds4, err := filterer.Filter(rctAddressesAndTopicFilter, test_helpers.MockConvertedPayload)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(iplds4).ToNot(BeNil())
|
|
||||||
Expect(iplds4.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64()))
|
|
||||||
Expect(iplds4.Header).To(Equal(models.IPLDModel{}))
|
|
||||||
Expect(len(iplds4.Uncles)).To(Equal(0))
|
|
||||||
Expect(len(iplds4.Transactions)).To(Equal(0))
|
|
||||||
Expect(len(iplds4.StorageNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds4.StateNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds4.Receipts)).To(Equal(1))
|
|
||||||
Expect(iplds4.Receipts[0]).To(Equal(models.IPLDModel{
|
|
||||||
BlockNumber: test_helpers.BlockNumber.String(),
|
|
||||||
Data: test_helpers.Rct2IPLD,
|
|
||||||
Key: test_helpers.Rct2CID.String(),
|
|
||||||
}))
|
|
||||||
|
|
||||||
iplds5, err := filterer.Filter(rctsForAllCollectedTrxs, test_helpers.MockConvertedPayload)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(iplds5).ToNot(BeNil())
|
|
||||||
Expect(iplds5.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64()))
|
|
||||||
Expect(iplds5.Header).To(Equal(models.IPLDModel{}))
|
|
||||||
Expect(len(iplds5.Uncles)).To(Equal(0))
|
|
||||||
Expect(len(iplds5.Transactions)).To(Equal(4))
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds5.Transactions, test_helpers.Tx1)).To(BeTrue())
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds5.Transactions, test_helpers.Tx2)).To(BeTrue())
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds5.Transactions, test_helpers.Tx3)).To(BeTrue())
|
|
||||||
Expect(len(iplds5.StorageNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds5.StateNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds5.Receipts)).To(Equal(4))
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds5.Receipts, test_helpers.Rct1IPLD)).To(BeTrue())
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds5.Receipts, test_helpers.Rct2IPLD)).To(BeTrue())
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds5.Receipts, test_helpers.Rct3IPLD)).To(BeTrue())
|
|
||||||
|
|
||||||
iplds6, err := filterer.Filter(rctsForSelectCollectedTrxs, test_helpers.MockConvertedPayload)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(iplds6).ToNot(BeNil())
|
|
||||||
Expect(iplds6.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64()))
|
|
||||||
Expect(iplds6.Header).To(Equal(models.IPLDModel{}))
|
|
||||||
Expect(len(iplds6.Uncles)).To(Equal(0))
|
|
||||||
Expect(len(iplds6.Transactions)).To(Equal(1))
|
|
||||||
Expect(shared.IPLDsContainBytes(iplds5.Transactions, test_helpers.Tx2)).To(BeTrue())
|
|
||||||
Expect(len(iplds6.StorageNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds6.StateNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds6.Receipts)).To(Equal(1))
|
|
||||||
Expect(iplds4.Receipts[0]).To(Equal(models.IPLDModel{
|
|
||||||
BlockNumber: test_helpers.BlockNumber.String(),
|
|
||||||
Data: test_helpers.Rct2IPLD,
|
|
||||||
Key: test_helpers.Rct2CID.String(),
|
|
||||||
}))
|
|
||||||
|
|
||||||
iplds7, err := filterer.Filter(stateFilter, test_helpers.MockConvertedPayload)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(iplds7).ToNot(BeNil())
|
|
||||||
Expect(iplds7.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64()))
|
|
||||||
Expect(iplds7.Header).To(Equal(models.IPLDModel{}))
|
|
||||||
Expect(len(iplds7.Uncles)).To(Equal(0))
|
|
||||||
Expect(len(iplds7.Transactions)).To(Equal(0))
|
|
||||||
Expect(len(iplds7.StorageNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds7.Receipts)).To(Equal(0))
|
|
||||||
Expect(len(iplds7.StateNodes)).To(Equal(1))
|
|
||||||
Expect(iplds7.StateNodes[0].StateLeafKey.Bytes()).To(Equal(test_helpers.AccountLeafKey))
|
|
||||||
Expect(iplds7.StateNodes[0].IPLD).To(Equal(models.IPLDModel{
|
|
||||||
BlockNumber: test_helpers.BlockNumber.String(),
|
|
||||||
Data: test_helpers.State2IPLD.RawData(),
|
|
||||||
Key: test_helpers.State2IPLD.Cid().String(),
|
|
||||||
}))
|
|
||||||
|
|
||||||
iplds8, err := filterer.Filter(rctTopicsAndAddressFilterFail, test_helpers.MockConvertedPayload)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(iplds8).ToNot(BeNil())
|
|
||||||
Expect(iplds8.BlockNumber.Int64()).To(Equal(test_helpers.MockIPLDs.BlockNumber.Int64()))
|
|
||||||
Expect(iplds8.Header).To(Equal(models.IPLDModel{}))
|
|
||||||
Expect(len(iplds8.Uncles)).To(Equal(0))
|
|
||||||
Expect(len(iplds8.Transactions)).To(Equal(0))
|
|
||||||
Expect(len(iplds8.StorageNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds8.StateNodes)).To(Equal(0))
|
|
||||||
Expect(len(iplds8.Receipts)).To(Equal(0))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
|
@ -1,248 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2019 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package eth
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"math/big"
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/log"
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/shared"
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
|
||||||
"github.com/jmoiron/sqlx"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Fetcher interface for substituting mocks in tests
|
|
||||||
type Fetcher interface {
|
|
||||||
Fetch(cids CIDWrapper) (*IPLDs, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// IPLDFetcher satisfies the IPLDFetcher interface for ethereum
|
|
||||||
// It interfaces directly with PG-IPFS
|
|
||||||
type IPLDFetcher struct {
|
|
||||||
db *sqlx.DB
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewIPLDFetcher creates a pointer to a new IPLDFetcher
|
|
||||||
func NewIPLDFetcher(db *sqlx.DB) *IPLDFetcher {
|
|
||||||
return &IPLDFetcher{
|
|
||||||
db: db,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper
|
|
||||||
func (f *IPLDFetcher) Fetch(cids CIDWrapper) (*IPLDs, error) {
|
|
||||||
log.Debug("fetching iplds")
|
|
||||||
iplds := new(IPLDs)
|
|
||||||
var ok bool
|
|
||||||
iplds.TotalDifficulty, ok = new(big.Int).SetString(cids.Header.TotalDifficulty, 10)
|
|
||||||
if !ok {
|
|
||||||
return nil, errors.New("eth fetcher: unable to set total difficulty")
|
|
||||||
}
|
|
||||||
iplds.BlockNumber = cids.BlockNumber
|
|
||||||
|
|
||||||
tx, err := f.db.Beginx()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if p := recover(); p != nil {
|
|
||||||
shared.Rollback(tx)
|
|
||||||
panic(p)
|
|
||||||
} else if err != nil {
|
|
||||||
shared.Rollback(tx)
|
|
||||||
} else {
|
|
||||||
err = tx.Commit()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
iplds.Header, err = f.FetchHeader(tx, cids.Header)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("eth pg fetcher: header fetching error: %s", err.Error())
|
|
||||||
}
|
|
||||||
iplds.Uncles, err = f.FetchUncles(tx, cids.Uncles)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("eth pg fetcher: uncle fetching error: %s", err.Error())
|
|
||||||
}
|
|
||||||
iplds.Transactions, err = f.FetchTrxs(tx, cids.Transactions)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("eth pg fetcher: transaction fetching error: %s", err.Error())
|
|
||||||
}
|
|
||||||
iplds.Receipts, err = f.FetchRcts(tx, cids.Receipts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("eth pg fetcher: receipt fetching error: %s", err.Error())
|
|
||||||
}
|
|
||||||
iplds.StateNodes, err = f.FetchState(tx, cids.StateNodes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("eth pg fetcher: state fetching error: %s", err.Error())
|
|
||||||
}
|
|
||||||
iplds.StorageNodes, err = f.FetchStorage(tx, cids.StorageNodes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("eth pg fetcher: storage fetching error: %s", err.Error())
|
|
||||||
}
|
|
||||||
return iplds, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// FetchHeader fetches header
|
|
||||||
func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c models.HeaderModel) (models.IPLDModel, error) {
|
|
||||||
log.Debug("fetching header ipld")
|
|
||||||
blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return models.IPLDModel{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
headerBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber)
|
|
||||||
if err != nil {
|
|
||||||
return models.IPLDModel{}, err
|
|
||||||
}
|
|
||||||
return models.IPLDModel{
|
|
||||||
BlockNumber: c.BlockNumber,
|
|
||||||
Data: headerBytes,
|
|
||||||
Key: c.CID,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FetchUncles fetches uncles
|
|
||||||
func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []models.UncleModel) ([]models.IPLDModel, error) {
|
|
||||||
log.Debug("fetching uncle iplds")
|
|
||||||
uncleIPLDs := make([]models.IPLDModel, len(cids))
|
|
||||||
for i, c := range cids {
|
|
||||||
blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
uncleBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
uncleIPLDs[i] = models.IPLDModel{
|
|
||||||
BlockNumber: c.BlockNumber,
|
|
||||||
Data: uncleBytes,
|
|
||||||
Key: c.CID,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return uncleIPLDs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FetchTrxs fetches transactions
|
|
||||||
func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []models.TxModel) ([]models.IPLDModel, error) {
|
|
||||||
log.Debug("fetching transaction iplds")
|
|
||||||
trxIPLDs := make([]models.IPLDModel, len(cids))
|
|
||||||
for i, c := range cids {
|
|
||||||
blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
txBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.MhKey, blockNumber)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
trxIPLDs[i] = models.IPLDModel{
|
|
||||||
BlockNumber: c.BlockNumber,
|
|
||||||
Data: txBytes,
|
|
||||||
Key: c.CID,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return trxIPLDs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FetchRcts fetches receipts
|
|
||||||
func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []models.ReceiptModel) ([]models.IPLDModel, error) {
|
|
||||||
log.Debug("fetching receipt iplds")
|
|
||||||
rctIPLDs := make([]models.IPLDModel, len(cids))
|
|
||||||
for i, c := range cids {
|
|
||||||
blockNumber, err := strconv.ParseUint(c.BlockNumber, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
rctBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, c.LeafMhKey, blockNumber)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
//nodeVal, err := DecodeLeafNode(rctBytes)
|
|
||||||
rctIPLDs[i] = models.IPLDModel{
|
|
||||||
BlockNumber: c.BlockNumber,
|
|
||||||
Data: rctBytes,
|
|
||||||
Key: c.LeafCID,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return rctIPLDs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FetchState fetches state nodes
|
|
||||||
func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []models.StateNodeModel) ([]StateNode, error) {
|
|
||||||
log.Debug("fetching state iplds")
|
|
||||||
stateNodes := make([]StateNode, 0, len(cids))
|
|
||||||
for _, stateNode := range cids {
|
|
||||||
if stateNode.CID == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
blockNumber, err := strconv.ParseUint(stateNode.BlockNumber, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
stateBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, stateNode.MhKey, blockNumber)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
stateNodes = append(stateNodes, StateNode{
|
|
||||||
IPLD: models.IPLDModel{
|
|
||||||
BlockNumber: stateNode.BlockNumber,
|
|
||||||
Data: stateBytes,
|
|
||||||
Key: stateNode.CID,
|
|
||||||
},
|
|
||||||
StateLeafKey: common.HexToHash(stateNode.StateKey),
|
|
||||||
Type: ResolveToNodeType(stateNode.NodeType),
|
|
||||||
Path: stateNode.Path,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return stateNodes, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FetchStorage fetches storage nodes
|
|
||||||
func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []models.StorageNodeWithStateKeyModel) ([]StorageNode, error) {
|
|
||||||
log.Debug("fetching storage iplds")
|
|
||||||
storageNodes := make([]StorageNode, 0, len(cids))
|
|
||||||
for _, storageNode := range cids {
|
|
||||||
if storageNode.CID == "" || storageNode.StateKey == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
blockNumber, err := strconv.ParseUint(storageNode.BlockNumber, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
storageBytes, err := shared.FetchIPLDByMhKeyAndBlockNumber(tx, storageNode.MhKey, blockNumber)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
storageNodes = append(storageNodes, StorageNode{
|
|
||||||
IPLD: models.IPLDModel{
|
|
||||||
BlockNumber: storageNode.BlockNumber,
|
|
||||||
Data: storageBytes,
|
|
||||||
Key: storageNode.CID,
|
|
||||||
},
|
|
||||||
StateLeafKey: common.HexToHash(storageNode.StateKey),
|
|
||||||
StorageLeafKey: common.HexToHash(storageNode.StorageKey),
|
|
||||||
Type: ResolveToNodeType(storageNode.NodeType),
|
|
||||||
Path: storageNode.Path,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return storageNodes, nil
|
|
||||||
}
|
|
@ -1,75 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2019 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package eth_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/ethereum/go-ethereum/params"
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
|
||||||
"github.com/jmoiron/sqlx"
|
|
||||||
. "github.com/onsi/ginkgo"
|
|
||||||
. "github.com/onsi/gomega"
|
|
||||||
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/eth"
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/eth/test_helpers"
|
|
||||||
"github.com/cerc-io/ipld-eth-server/v4/pkg/shared"
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ = Describe("IPLDFetcher", func() {
|
|
||||||
var (
|
|
||||||
db *sqlx.DB
|
|
||||||
pubAndIndexer interfaces.StateDiffIndexer
|
|
||||||
fetcher *eth.IPLDFetcher
|
|
||||||
)
|
|
||||||
Describe("Fetch", func() {
|
|
||||||
BeforeEach(func() {
|
|
||||||
var (
|
|
||||||
err error
|
|
||||||
tx interfaces.Batch
|
|
||||||
)
|
|
||||||
db = shared.SetupDB()
|
|
||||||
pubAndIndexer = shared.SetupTestStateDiffIndexer(ctx, params.TestChainConfig, test_helpers.Genesis.Hash())
|
|
||||||
|
|
||||||
tx, err = pubAndIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty())
|
|
||||||
for _, node := range test_helpers.MockStateNodes {
|
|
||||||
err = pubAndIndexer.PushStateNode(tx, node, test_helpers.MockBlock.Hash().String())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
}
|
|
||||||
|
|
||||||
err = tx.Submit(err)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
fetcher = eth.NewIPLDFetcher(db)
|
|
||||||
|
|
||||||
})
|
|
||||||
AfterEach(func() {
|
|
||||||
shared.TearDownDB(db)
|
|
||||||
})
|
|
||||||
|
|
||||||
It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() {
|
|
||||||
iplds, err := fetcher.Fetch(*test_helpers.MockCIDWrapper)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(iplds).ToNot(BeNil())
|
|
||||||
Expect(iplds.TotalDifficulty).To(Equal(test_helpers.MockConvertedPayload.TotalDifficulty))
|
|
||||||
Expect(iplds.BlockNumber).To(Equal(test_helpers.MockConvertedPayload.Block.Number()))
|
|
||||||
Expect(iplds.Header).To(Equal(test_helpers.MockIPLDs.Header))
|
|
||||||
Expect(len(iplds.Uncles)).To(Equal(0))
|
|
||||||
Expect(iplds.Transactions).To(Equal(test_helpers.MockIPLDs.Transactions))
|
|
||||||
Expect(iplds.Receipts).To(Equal(test_helpers.MockIPLDs.Receipts))
|
|
||||||
Expect(iplds.StateNodes).To(Equal(test_helpers.MockIPLDs.StateNodes))
|
|
||||||
Expect(iplds.StorageNodes).To(Equal(test_helpers.MockIPLDs.StorageNodes))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
|
@ -1,37 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2019 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package serve
|
|
||||||
|
|
||||||
import "github.com/cerc-io/ipld-eth-server/v4/pkg/log"
|
|
||||||
|
|
||||||
func sendNonBlockingErr(sub Subscription, err error) {
|
|
||||||
log.Error(err)
|
|
||||||
select {
|
|
||||||
case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: err.Error(), Flag: EmptyFlag}:
|
|
||||||
default:
|
|
||||||
log.Infof("unable to send error to subscription %s", sub.ID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func sendNonBlockingQuit(sub Subscription) {
|
|
||||||
select {
|
|
||||||
case sub.QuitChan <- true:
|
|
||||||
log.Infof("closing subscription %s", sub.ID)
|
|
||||||
default:
|
|
||||||
log.Infof("unable to close subscription %s; channel has no receiver", sub.ID)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,60 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2019 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package serve
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Flag int32
|
|
||||||
|
|
||||||
const (
|
|
||||||
EmptyFlag Flag = iota
|
|
||||||
BackFillCompleteFlag
|
|
||||||
)
|
|
||||||
|
|
||||||
// Subscription holds the information for an individual client subscription to the watcher
|
|
||||||
type Subscription struct {
|
|
||||||
ID rpc.ID
|
|
||||||
PayloadChan chan<- SubscriptionPayload
|
|
||||||
QuitChan chan<- bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// SubscriptionPayload is the struct for a watcher data subscription payload
|
|
||||||
// It carries data of a type specific to the chain being supported/queried and an error message
|
|
||||||
type SubscriptionPayload struct {
|
|
||||||
Data []byte `json:"data"` // e.g. for Ethereum rlp serialized eth.StreamPayload
|
|
||||||
Height int64 `json:"height"`
|
|
||||||
Err string `json:"err"` // field for error
|
|
||||||
Flag Flag `json:"flag"` // field for message
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sp SubscriptionPayload) Error() error {
|
|
||||||
if sp.Err == "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return errors.New(sp.Err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sp SubscriptionPayload) BackFillComplete() bool {
|
|
||||||
if sp.Flag == BackFillCompleteFlag {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user