Clean up #231

Merged
telackey merged 2 commits from ian/v5_dev into v5_incremental 2023-03-02 04:26:41 +00:00
10 changed files with 50 additions and 1505 deletions
Showing only changes of commit 8ec203ec3d - Show all commits

View File

@ -25,15 +25,14 @@ import (
"sync"
"time"
"github.com/mailgun/groupcache/v2"
"github.com/cerc-io/ipld-eth-server/v4/pkg/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/mailgun/groupcache/v2"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/vulcanize/gap-filler/pkg/mux"
"github.com/cerc-io/ipld-eth-server/v4/pkg/eth"
"github.com/cerc-io/ipld-eth-server/v4/pkg/graphql"
srpc "github.com/cerc-io/ipld-eth-server/v4/pkg/rpc"
s "github.com/cerc-io/ipld-eth-server/v4/pkg/serve"
@ -59,7 +58,6 @@ var serveCmd = &cobra.Command{
func serve() {
logWithCommand.Infof("running ipld-eth-server version: %s", v.VersionWithMeta)
var forwardPayloadChan chan eth.ConvertedPayload
wg := new(sync.WaitGroup)
logWithCommand.Debug("loading server configuration variables")
serverConfig, err := s.NewConfig()
@ -74,8 +72,7 @@ func serve() {
}
logWithCommand.Info("starting up server servers")
forwardPayloadChan = make(chan eth.ConvertedPayload, s.PayloadChanBufferSize)
server.Serve(wg, forwardPayloadChan)
server.Serve(wg)
if err := startServers(server, serverConfig); err != nil {
logWithCommand.Fatal(err)
}

View File

@ -110,7 +110,6 @@ type Backend struct {
// postgres db interfaces
Retriever *CIDRetriever
Fetcher *IPLDFetcher
IPLDRetriever *IPLDRetriever
// ethereum interfaces
@ -148,7 +147,6 @@ func NewEthBackend(db *sqlx.DB, c *Config) (*Backend, error) {
return &Backend{
DB: db,
Retriever: r,
Fetcher: NewIPLDFetcher(db),
IPLDRetriever: NewIPLDRetriever(db),
EthDB: ethDB,
StateDatabase: state.NewDatabase(ethDB),

View File

@ -19,27 +19,16 @@ package eth
import (
"fmt"
"math/big"
"strconv"
"github.com/cerc-io/ipld-eth-server/v4/pkg/log"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"github.com/cerc-io/ipld-eth-server/v4/pkg/shared"
)
// Retriever interface for substituting mocks in tests
type Retriever interface {
RetrieveFirstBlockNumber() (int64, error)
RetrieveLastBlockNumber() (int64, error)
Retrieve(filter SubscriptionSettings, blockNumber int64) ([]CIDWrapper, bool, error)
}
// CIDRetriever satisfies the CIDRetriever interface for ethereum
type CIDRetriever struct {
db *sqlx.DB
@ -128,158 +117,6 @@ func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) {
return blockNumber, err
}
// Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters
func (ecr *CIDRetriever) Retrieve(filter SubscriptionSettings, blockNumber int64) ([]CIDWrapper, bool, error) {
log.Debug("retrieving cids")
// Begin new db tx
tx, err := ecr.db.Beginx()
if err != nil {
return nil, true, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// Retrieve cached header CIDs at this block height
var headers []models.HeaderModel
headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil {
log.Error("header cid retrieval error", err)
return nil, true, err
}
cws := make([]CIDWrapper, len(headers))
empty := true
for i, header := range headers {
cw := new(CIDWrapper)
cw.BlockNumber = big.NewInt(blockNumber)
if !filter.HeaderFilter.Off {
cw.Header = header
empty = false
if filter.HeaderFilter.Uncles {
// Retrieve uncle cids for this header id
var uncleCIDs []models.UncleModel
uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, header.BlockHash)
if err != nil {
log.Error("uncle cid retrieval error")
return nil, true, err
}
cw.Uncles = uncleCIDs
}
}
// Retrieve cached trx CIDs
if !filter.TxFilter.Off {
cw.Transactions, err = ecr.RetrieveTxCIDs(tx, filter.TxFilter, header.BlockHash)
if err != nil {
log.Error("transaction cid retrieval error")
return nil, true, err
}
if len(cw.Transactions) > 0 {
empty = false
}
}
trxHashes := make([]string, len(cw.Transactions))
for j, t := range cw.Transactions {
trxHashes[j] = t.TxHash
}
// Retrieve cached receipt CIDs
if !filter.ReceiptFilter.Off {
cw.Receipts, err = ecr.RetrieveRctCIDs(tx, filter.ReceiptFilter, 0, header.BlockHash, trxHashes)
if err != nil {
log.Error("receipt cid retrieval error")
return nil, true, err
}
if len(cw.Receipts) > 0 {
empty = false
}
}
// Retrieve cached state CIDs
if !filter.StateFilter.Off {
cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, filter.StateFilter, header.BlockHash)
if err != nil {
log.Error("state cid retrieval error")
return nil, true, err
}
if len(cw.StateNodes) > 0 {
empty = false
}
}
// Retrieve cached storage CIDs
if !filter.StorageFilter.Off {
cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, filter.StorageFilter, header.BlockHash)
if err != nil {
log.Error("storage cid retrieval error")
return nil, true, err
}
if len(cw.StorageNodes) > 0 {
empty = false
}
}
cws[i] = *cw
}
return cws, empty, err
}
// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight
func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]models.HeaderModel, error) {
log.Debug("retrieving header cids for block ", blockNumber)
headers := make([]models.HeaderModel, 0)
pgStr := `SELECT CAST(block_number as Text), block_hash, parent_hash, cid, mh_key, CAST(td as Text), node_id,
CAST(reward as Text), state_root, uncle_root,tx_root, receipt_root, bloom, timestamp, times_validated, coinbase
FROM eth.header_cids
WHERE block_number = $1`
return headers, tx.Select(&headers, pgStr, blockNumber)
}
// RetrieveUncleCIDsByHeaderID retrieves and returns all of the uncle cids for the provided header
func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID string) ([]models.UncleModel, error) {
log.Debug("retrieving uncle cids for block id ", headerID)
headers := make([]models.UncleModel, 0)
pgStr := `SELECT CAST(block_number as Text), header_id, block_hash, parent_hash, cid, mh_key, CAST(reward as text)
FROM eth.uncle_cids
WHERE header_id = $1`
return headers, tx.Select(&headers, pgStr, headerID)
}
// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
// also returns the ids for the returned transaction cids
func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID string) ([]models.TxModel, error) {
log.Debug("retrieving transaction cids for header id ", headerID)
args := make([]interface{}, 0, 3)
results := make([]models.TxModel, 0)
id := 1
pgStr := fmt.Sprintf(`SELECT CAST(transaction_cids.block_number as Text), transaction_cids.tx_hash,
transaction_cids.header_id, transaction_cids.cid, transaction_cids.mh_key, transaction_cids.dst,
transaction_cids.src, transaction_cids.index, transaction_cids.tx_data, transaction_cids.tx_type
FROM eth.transaction_cids
INNER JOIN eth.header_cids ON (
transaction_cids.header_id = header_cids.block_hash
AND transaction_cids.block_number = header_cids.block_number
)
WHERE header_cids.block_hash = $%d`, id)
args = append(args, headerID)
id++
if len(txFilter.Dst) > 0 {
pgStr += fmt.Sprintf(` AND transaction_cids.dst = ANY($%d::VARCHAR(66)[])`, id)
args = append(args, pq.Array(txFilter.Dst))
id++
}
if len(txFilter.Src) > 0 {
pgStr += fmt.Sprintf(` AND transaction_cids.src = ANY($%d::VARCHAR(66)[])`, id)
args = append(args, pq.Array(txFilter.Src))
}
pgStr += ` ORDER BY transaction_cids.index`
return results, tx.Select(&results, pgStr, args...)
}
func topicFilterCondition(id *int, topics [][]string, args []interface{}, pgStr string, first bool) (string, []interface{}) {
for i, topicSet := range topics {
if len(topicSet) == 0 {
@ -439,38 +276,6 @@ func (ecr *CIDRetriever) RetrieveFilteredLog(tx *sqlx.Tx, rctFilter ReceiptFilte
return logCIDs, nil
}
// RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided
// filter parameters and correspond to the provided tx ids
func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash string, txHashes []string) ([]models.ReceiptModel, error) {
log.Debug("retrieving receipt cids for block ", blockNumber)
args := make([]interface{}, 0, 5)
pgStr := `SELECT CAST(receipt_cids.block_number as Text), receipt_cids.header_id, receipt_cids.tx_id,
receipt_cids.leaf_cid, receipt_cids.leaf_mh_key, receipt_cids.contract, receipt_cids.contract_hash
FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.tx_hash
AND receipt_cids.header_id = transaction_cids.header_id
AND receipt_cids.block_number = transaction_cids.block_number
AND transaction_cids.header_id = header_cids.block_hash
AND transaction_cids.block_number = header_cids.block_number`
id := 1
if blockNumber > 0 {
pgStr += fmt.Sprintf(` AND header_cids.block_number = $%d`, id)
args = append(args, blockNumber)
id++
}
if blockHash != "" {
pgStr += fmt.Sprintf(` AND header_cids.block_hash = $%d`, id)
args = append(args, blockHash)
id++
}
pgStr, args = receiptFilterConditions(&id, pgStr, args, rctFilter, txHashes)
pgStr += ` ORDER BY transaction_cids.index`
receiptCIDs := make([]models.ReceiptModel, 0)
return receiptCIDs, tx.Select(&receiptCIDs, pgStr, args...)
}
func hasTopics(topics [][]string) bool {
for _, topicSet := range topics {
if len(topicSet) > 0 {
@ -480,179 +285,6 @@ func hasTopics(topics [][]string) bool {
return false
}
// RetrieveStateCIDs retrieves and returns all of the state node cids at the provided header ID that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, headerID string) ([]models.StateNodeModel, error) {
log.Debug("retrieving state cids for header id ", headerID)
args := make([]interface{}, 0, 2)
pgStr := `SELECT CAST(state_cids.block_number as Text), state_cids.header_id,
state_cids.state_leaf_key, state_cids.node_type, state_cids.cid, state_cids.mh_key, state_cids.state_path
FROM eth.state_cids
INNER JOIN eth.header_cids ON (
state_cids.header_id = header_cids.block_hash
AND state_cids.block_number = header_cids.block_number
)
WHERE header_cids.block_hash = $1`
args = append(args, headerID)
addrLen := len(stateFilter.Addresses)
if addrLen > 0 {
keys := make([]string, addrLen)
for i, addr := range stateFilter.Addresses {
keys[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()).String()
}
pgStr += ` AND state_cids.state_leaf_key = ANY($2::VARCHAR(66)[])`
args = append(args, pq.Array(keys))
}
if !stateFilter.IntermediateNodes {
pgStr += ` AND state_cids.node_type = 2`
}
stateNodeCIDs := make([]models.StateNodeModel, 0)
return stateNodeCIDs, tx.Select(&stateNodeCIDs, pgStr, args...)
}
// RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided header id that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID string) ([]models.StorageNodeWithStateKeyModel, error) {
log.Debug("retrieving storage cids for header id ", headerID)
args := make([]interface{}, 0, 3)
pgStr := `SELECT CAST(storage_cids.block_number as Text), storage_cids.header_id, storage_cids.storage_leaf_key,
storage_cids.node_type, storage_cids.cid, storage_cids.mh_key, storage_cids.storage_path, storage_cids.state_path,
state_cids.state_leaf_key
FROM eth.storage_cids, eth.state_cids, eth.header_cids
WHERE storage_cids.header_id = state_cids.header_id
AND storage_cids.state_path = state_cids.state_path
AND storage_cids.block_number = state_cids.block_number
AND state_cids.header_id = header_cids.block_hash
AND state_cids.block_number = header_cids.block_number
AND header_cids.block_hash = $1`
args = append(args, headerID)
id := 2
addrLen := len(storageFilter.Addresses)
if addrLen > 0 {
keys := make([]string, addrLen)
for i, addr := range storageFilter.Addresses {
keys[i] = crypto.Keccak256Hash(common.HexToAddress(addr).Bytes()).String()
}
pgStr += fmt.Sprintf(` AND state_cids.state_leaf_key = ANY($%d::VARCHAR(66)[])`, id)
args = append(args, pq.Array(keys))
id++
}
if len(storageFilter.StorageKeys) > 0 {
pgStr += fmt.Sprintf(` AND storage_cids.storage_leaf_key = ANY($%d::VARCHAR(66)[])`, id)
args = append(args, pq.Array(storageFilter.StorageKeys))
}
if !storageFilter.IntermediateNodes {
pgStr += ` AND storage_cids.node_type = 2`
}
storageNodeCIDs := make([]models.StorageNodeWithStateKeyModel, 0)
return storageNodeCIDs, tx.Select(&storageNodeCIDs, pgStr, args...)
}
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (models.HeaderModel, []models.UncleModel, []models.TxModel, []models.ReceiptModel, error) {
log.Debug("retrieving block cids for block hash ", blockHash.String())
// Begin new db tx
tx, err := ecr.db.Beginx()
if err != nil {
return models.HeaderModel{}, nil, nil, nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
var headerCID models.HeaderModel
headerCID, err = ecr.RetrieveHeaderCIDByHash(tx, blockHash)
if err != nil {
log.Error("header cid retrieval error")
return models.HeaderModel{}, nil, nil, nil, err
}
blockNumber, err := strconv.ParseInt(headerCID.BlockNumber, 10, 64)
if err != nil {
return models.HeaderModel{}, nil, nil, nil, err
}
var uncleCIDs []models.UncleModel
uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.BlockHash)
if err != nil {
log.Error("uncle cid retrieval error")
return models.HeaderModel{}, nil, nil, nil, err
}
var txCIDs []models.TxModel
txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.BlockHash, blockNumber)
if err != nil {
log.Error("tx cid retrieval error")
return models.HeaderModel{}, nil, nil, nil, err
}
txHashes := make([]string, len(txCIDs))
for i, txCID := range txCIDs {
txHashes[i] = txCID.TxHash
}
var rctCIDs []models.ReceiptModel
rctCIDs, err = ecr.RetrieveReceiptCIDsByByHeaderIDAndTxIDs(tx, headerCID.BlockHash, txHashes, blockNumber)
if err != nil {
log.Error("rct cid retrieval error")
}
return headerCID, uncleCIDs, txCIDs, rctCIDs, err
}
// RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number
func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (models.HeaderModel, []models.UncleModel, []models.TxModel, []models.ReceiptModel, error) {
log.Debug("retrieving block cids for block number ", blockNumber)
// Begin new db tx
tx, err := ecr.db.Beginx()
if err != nil {
return models.HeaderModel{}, nil, nil, nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
var headerCID []models.HeaderModel
headerCID, err = ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil {
log.Error("header cid retrieval error")
return models.HeaderModel{}, nil, nil, nil, err
}
if len(headerCID) < 1 {
return models.HeaderModel{}, nil, nil, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber)
}
var uncleCIDs []models.UncleModel
uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID[0].BlockHash)
if err != nil {
log.Error("uncle cid retrieval error")
return models.HeaderModel{}, nil, nil, nil, err
}
var txCIDs []models.TxModel
txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].BlockHash, blockNumber)
if err != nil {
log.Error("tx cid retrieval error")
return models.HeaderModel{}, nil, nil, nil, err
}
txHashes := make([]string, len(txCIDs))
for i, txCID := range txCIDs {
txHashes[i] = txCID.TxHash
}
var rctCIDs []models.ReceiptModel
rctCIDs, err = ecr.RetrieveReceiptCIDsByByHeaderIDAndTxIDs(tx, headerCID[0].BlockHash, txHashes, blockNumber)
if err != nil {
log.Error("rct cid retrieval error")
}
return headerCID[0], uncleCIDs, txCIDs, rctCIDs, err
}
// RetrieveHeaderCIDByHash returns the header for the given block hash
func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (models.HeaderModel, error) {
log.Debug("retrieving header cids for block hash ", blockHash.String())
@ -663,35 +295,6 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.H
return headerCID, tx.Get(&headerCID, pgStr, blockHash.String())
}
// RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id
func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID string, blockNumber int64) ([]models.TxModel, error) {
log.Debug("retrieving tx cids for block id ", headerID)
pgStr := `SELECT CAST(block_number as Text), header_id, index, tx_hash, cid, mh_key,
dst, src, tx_data, tx_type, value
FROM eth.transaction_cids
WHERE header_id = $1 AND block_number = $2
ORDER BY index`
var txCIDs []models.TxModel
return txCIDs, tx.Select(&txCIDs, pgStr, headerID, blockNumber)
}
// RetrieveReceiptCIDsByByHeaderIDAndTxIDs retrieves receipt CIDs by their associated tx IDs for the given header id
func (ecr *CIDRetriever) RetrieveReceiptCIDsByByHeaderIDAndTxIDs(tx *sqlx.Tx, headerID string, txHashes []string, blockNumber int64) ([]models.ReceiptModel, error) {
log.Debugf("retrieving receipt cids for tx hashes %v", txHashes)
pgStr := `SELECT CAST(receipt_cids.block_number as Text), receipt_cids.header_id, receipt_cids.tx_id, receipt_cids.leaf_cid,
receipt_cids.leaf_mh_key, receipt_cids.contract, receipt_cids.contract_hash
FROM eth.receipt_cids, eth.transaction_cids
WHERE tx_id = ANY($2)
AND receipt_cids.tx_id = transaction_cids.tx_hash
AND receipt_cids.header_id = transaction_cids.header_id
AND receipt_cids.block_number = transaction_cids.block_number
AND transaction_cids.header_id = $1
AND transaction_cids.block_number = $3
ORDER BY transaction_cids.index`
var rctCIDs []models.ReceiptModel
return rctCIDs, tx.Select(&rctCIDs, pgStr, headerID, pq.Array(txHashes), blockNumber)
}
// RetrieveHeaderAndTxCIDsByBlockNumber retrieves header CIDs and their associated tx CIDs by block number
func (ecr *CIDRetriever) RetrieveHeaderAndTxCIDsByBlockNumber(blockNumber int64) ([]HeaderCIDRecord, error) {
log.Debug("retrieving header cids and tx cids for block number ", blockNumber)

View File

@ -17,197 +17,18 @@
package eth_test
import (
"math/big"
"github.com/cerc-io/ipld-eth-server/v4/pkg/eth"
"github.com/cerc-io/ipld-eth-server/v4/pkg/eth/test_helpers"
"github.com/cerc-io/ipld-eth-server/v4/pkg/shared"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/trie"
"github.com/jmoiron/sqlx"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var (
openFilter = eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{},
TxFilter: eth.TxFilter{},
ReceiptFilter: eth.ReceiptFilter{},
StateFilter: eth.StateFilter{},
StorageFilter: eth.StorageFilter{},
}
rctAddressFilter = eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: eth.TxFilter{
Off: true,
},
ReceiptFilter: eth.ReceiptFilter{
LogAddresses: []string{test_helpers.Address.String()},
},
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: eth.StorageFilter{
Off: true,
},
}
rctTopicsFilter = eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: eth.TxFilter{
Off: true,
},
ReceiptFilter: eth.ReceiptFilter{
Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000004"}},
},
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: eth.StorageFilter{
Off: true,
},
}
rctTopicsAndAddressFilter = eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: eth.TxFilter{
Off: true,
},
ReceiptFilter: eth.ReceiptFilter{
Topics: [][]string{
{"0x0000000000000000000000000000000000000000000000000000000000000004"},
{"0x0000000000000000000000000000000000000000000000000000000000000006"},
},
LogAddresses: []string{test_helpers.Address.String()},
},
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: eth.StorageFilter{
Off: true,
},
}
rctTopicsAndAddressFilterFail = eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: eth.TxFilter{
Off: true,
},
ReceiptFilter: eth.ReceiptFilter{
Topics: [][]string{
{"0x0000000000000000000000000000000000000000000000000000000000000004"},
{"0x0000000000000000000000000000000000000000000000000000000000000007"}, // This topic won't match on the mocks.Address.String() contract receipt
},
LogAddresses: []string{test_helpers.Address.String()},
},
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: eth.StorageFilter{
Off: true,
},
}
rctAddressesAndTopicFilter = eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: eth.TxFilter{
Off: true,
},
ReceiptFilter: eth.ReceiptFilter{
Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000005"}},
LogAddresses: []string{test_helpers.Address.String(), test_helpers.AnotherAddress.String()},
},
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: eth.StorageFilter{
Off: true,
},
}
rctsForAllCollectedTrxs = eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: eth.TxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter
ReceiptFilter: eth.ReceiptFilter{
MatchTxs: true,
Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have
LogAddresses: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have
},
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: eth.StorageFilter{
Off: true,
},
}
rctsForSelectCollectedTrxs = eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: eth.TxFilter{
Dst: []string{test_helpers.AnotherAddress.String()}, // We only filter for one of the trxs so we will only get the one corresponding receipt
},
ReceiptFilter: eth.ReceiptFilter{
MatchTxs: true,
Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have
LogAddresses: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have
},
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: eth.StorageFilter{
Off: true,
},
}
stateFilter = eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: eth.TxFilter{
Off: true,
},
ReceiptFilter: eth.ReceiptFilter{
Off: true,
},
StateFilter: eth.StateFilter{
Addresses: []string{test_helpers.AccountAddresss.Hex()},
},
StorageFilter: eth.StorageFilter{
Off: true,
},
}
)
var _ = Describe("Retriever", func() {
var (
db *sqlx.DB
@ -236,196 +57,6 @@ var _ = Describe("Retriever", func() {
err = tx.Submit(err)
Expect(err).ToNot(HaveOccurred())
})
It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() {
type rctCIDAndMHKeyResult struct {
LeafCID string `db:"leaf_cid"`
LeafMhKey string `db:"leaf_mh_key"`
}
expectedRctCIDsAndLeafNodes := make([]rctCIDAndMHKeyResult, 0)
pgStr := `SELECT receipt_cids.leaf_cid, receipt_cids.leaf_mh_key FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.tx_hash
AND transaction_cids.header_id = header_cids.block_hash
AND header_cids.block_number = $1
ORDER BY transaction_cids.index`
err := db.Select(&expectedRctCIDsAndLeafNodes, pgStr, test_helpers.BlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred())
cids, empty, err := retriever.Retrieve(openFilter, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
Expect(len(cids)).To(Equal(1))
Expect(cids[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
expectedHeaderCID := test_helpers.MockCIDWrapper.Header
expectedHeaderCID.BlockHash = cids[0].Header.BlockHash
expectedHeaderCID.NodeID = cids[0].Header.NodeID
Expect(cids[0].Header).To(Equal(expectedHeaderCID))
Expect(len(cids[0].Transactions)).To(Equal(4))
Expect(eth.TxModelsContainsCID(cids[0].Transactions, test_helpers.MockCIDWrapper.Transactions[0].CID)).To(BeTrue())
Expect(eth.TxModelsContainsCID(cids[0].Transactions, test_helpers.MockCIDWrapper.Transactions[1].CID)).To(BeTrue())
Expect(eth.TxModelsContainsCID(cids[0].Transactions, test_helpers.MockCIDWrapper.Transactions[2].CID)).To(BeTrue())
Expect(len(cids[0].Receipts)).To(Equal(4))
Expect(eth.ReceiptModelsContainsCID(cids[0].Receipts, expectedRctCIDsAndLeafNodes[0].LeafCID)).To(BeTrue())
Expect(eth.ReceiptModelsContainsCID(cids[0].Receipts, expectedRctCIDsAndLeafNodes[1].LeafCID)).To(BeTrue())
Expect(eth.ReceiptModelsContainsCID(cids[0].Receipts, expectedRctCIDsAndLeafNodes[2].LeafCID)).To(BeTrue())
Expect(len(cids[0].StateNodes)).To(Equal(2))
for _, stateNode := range cids[0].StateNodes {
if stateNode.CID == test_helpers.State1CID.String() {
Expect(stateNode.StateKey).To(Equal(common.BytesToHash(test_helpers.ContractLeafKey).Hex()))
Expect(stateNode.NodeType).To(Equal(2))
Expect(stateNode.Path).To(Equal([]byte{'\x06'}))
}
if stateNode.CID == test_helpers.State2CID.String() {
Expect(stateNode.StateKey).To(Equal(common.BytesToHash(test_helpers.AccountLeafKey).Hex()))
Expect(stateNode.NodeType).To(Equal(2))
Expect(stateNode.Path).To(Equal([]byte{'\x0c'}))
}
}
Expect(len(cids[0].StorageNodes)).To(Equal(1))
expectedStorageNodeCIDs := test_helpers.MockCIDWrapper.StorageNodes
expectedStorageNodeCIDs[0].HeaderID = cids[0].StorageNodes[0].HeaderID
expectedStorageNodeCIDs[0].StatePath = cids[0].StorageNodes[0].StatePath
Expect(cids[0].StorageNodes).To(Equal(expectedStorageNodeCIDs))
})
It("Applies filters from the provided config.Subscription", func() {
type rctCIDAndMHKeyResult struct {
LeafCID string `db:"leaf_cid"`
LeafMhKey string `db:"leaf_mh_key"`
}
expectedRctCIDsAndLeafNodes := make([]rctCIDAndMHKeyResult, 0)
pgStr := `SELECT receipt_cids.leaf_cid, receipt_cids.leaf_mh_key FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.tx_hash
AND transaction_cids.header_id = header_cids.block_hash
AND header_cids.block_number = $1
ORDER BY transaction_cids.index`
err := db.Select(&expectedRctCIDsAndLeafNodes, pgStr, test_helpers.BlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred())
cids1, empty, err := retriever.Retrieve(rctAddressFilter, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
Expect(len(cids1)).To(Equal(1))
Expect(cids1[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids1[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids1[0].Transactions)).To(Equal(0))
Expect(len(cids1[0].StateNodes)).To(Equal(0))
Expect(len(cids1[0].StorageNodes)).To(Equal(0))
Expect(len(cids1[0].Receipts)).To(Equal(1))
expectedReceiptCID := test_helpers.MockCIDWrapper.Receipts[0]
expectedReceiptCID.TxID = cids1[0].Receipts[0].TxID
expectedReceiptCID.LeafCID = expectedRctCIDsAndLeafNodes[0].LeafCID
expectedReceiptCID.LeafMhKey = expectedRctCIDsAndLeafNodes[0].LeafMhKey
Expect(cids1[0].Receipts[0]).To(Equal(expectedReceiptCID))
cids2, empty, err := retriever.Retrieve(rctTopicsFilter, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
Expect(len(cids2)).To(Equal(1))
Expect(cids2[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids2[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids2[0].Transactions)).To(Equal(0))
Expect(len(cids2[0].StateNodes)).To(Equal(0))
Expect(len(cids2[0].StorageNodes)).To(Equal(0))
Expect(len(cids2[0].Receipts)).To(Equal(1))
expectedReceiptCID = test_helpers.MockCIDWrapper.Receipts[0]
expectedReceiptCID.TxID = cids2[0].Receipts[0].TxID
expectedReceiptCID.LeafCID = expectedRctCIDsAndLeafNodes[0].LeafCID
expectedReceiptCID.LeafMhKey = expectedRctCIDsAndLeafNodes[0].LeafMhKey
Expect(cids2[0].Receipts[0]).To(Equal(expectedReceiptCID))
cids3, empty, err := retriever.Retrieve(rctTopicsAndAddressFilter, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
Expect(len(cids3)).To(Equal(1))
Expect(cids3[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids3[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids3[0].Transactions)).To(Equal(0))
Expect(len(cids3[0].StateNodes)).To(Equal(0))
Expect(len(cids3[0].StorageNodes)).To(Equal(0))
Expect(len(cids3[0].Receipts)).To(Equal(1))
expectedReceiptCID = test_helpers.MockCIDWrapper.Receipts[0]
expectedReceiptCID.TxID = cids3[0].Receipts[0].TxID
expectedReceiptCID.LeafCID = expectedRctCIDsAndLeafNodes[0].LeafCID
expectedReceiptCID.LeafMhKey = expectedRctCIDsAndLeafNodes[0].LeafMhKey
Expect(cids3[0].Receipts[0]).To(Equal(expectedReceiptCID))
cids4, empty, err := retriever.Retrieve(rctAddressesAndTopicFilter, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
Expect(len(cids4)).To(Equal(1))
Expect(cids4[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids4[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids4[0].Transactions)).To(Equal(0))
Expect(len(cids4[0].StateNodes)).To(Equal(0))
Expect(len(cids4[0].StorageNodes)).To(Equal(0))
Expect(len(cids4[0].Receipts)).To(Equal(1))
expectedReceiptCID = test_helpers.MockCIDWrapper.Receipts[1]
expectedReceiptCID.TxID = cids4[0].Receipts[0].TxID
expectedReceiptCID.LeafCID = expectedRctCIDsAndLeafNodes[1].LeafCID
expectedReceiptCID.LeafMhKey = expectedRctCIDsAndLeafNodes[1].LeafMhKey
Expect(cids4[0].Receipts[0]).To(Equal(expectedReceiptCID))
cids5, empty, err := retriever.Retrieve(rctsForAllCollectedTrxs, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
Expect(len(cids5)).To(Equal(1))
Expect(cids5[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids5[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids5[0].Transactions)).To(Equal(4))
Expect(eth.TxModelsContainsCID(cids5[0].Transactions, test_helpers.Trx1CID.String())).To(BeTrue())
Expect(eth.TxModelsContainsCID(cids5[0].Transactions, test_helpers.Trx2CID.String())).To(BeTrue())
Expect(eth.TxModelsContainsCID(cids5[0].Transactions, test_helpers.Trx3CID.String())).To(BeTrue())
Expect(len(cids5[0].StateNodes)).To(Equal(0))
Expect(len(cids5[0].StorageNodes)).To(Equal(0))
Expect(len(cids5[0].Receipts)).To(Equal(4))
Expect(eth.ReceiptModelsContainsCID(cids5[0].Receipts, expectedRctCIDsAndLeafNodes[0].LeafCID)).To(BeTrue())
Expect(eth.ReceiptModelsContainsCID(cids5[0].Receipts, expectedRctCIDsAndLeafNodes[1].LeafCID)).To(BeTrue())
Expect(eth.ReceiptModelsContainsCID(cids5[0].Receipts, expectedRctCIDsAndLeafNodes[2].LeafCID)).To(BeTrue())
cids6, empty, err := retriever.Retrieve(rctsForSelectCollectedTrxs, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
Expect(len(cids6)).To(Equal(1))
Expect(cids6[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids6[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids6[0].Transactions)).To(Equal(1))
expectedTxCID := test_helpers.MockCIDWrapper.Transactions[1]
expectedTxCID.TxHash = cids6[0].Transactions[0].TxHash
expectedTxCID.HeaderID = cids6[0].Transactions[0].HeaderID
Expect(cids6[0].Transactions[0]).To(Equal(expectedTxCID))
Expect(len(cids6[0].StateNodes)).To(Equal(0))
Expect(len(cids6[0].StorageNodes)).To(Equal(0))
Expect(len(cids6[0].Receipts)).To(Equal(1))
expectedReceiptCID = test_helpers.MockCIDWrapper.Receipts[1]
expectedReceiptCID.TxID = cids6[0].Receipts[0].TxID
expectedReceiptCID.LeafCID = expectedRctCIDsAndLeafNodes[1].LeafCID
expectedReceiptCID.LeafMhKey = expectedRctCIDsAndLeafNodes[1].LeafMhKey
Expect(cids6[0].Receipts[0]).To(Equal(expectedReceiptCID))
cids7, empty, err := retriever.Retrieve(stateFilter, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).ToNot(BeTrue())
Expect(len(cids7)).To(Equal(1))
Expect(cids7[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids7[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids7[0].Transactions)).To(Equal(0))
Expect(len(cids7[0].Receipts)).To(Equal(0))
Expect(len(cids7[0].StorageNodes)).To(Equal(0))
Expect(len(cids7[0].StateNodes)).To(Equal(1))
Expect(cids7[0].StateNodes[0]).To(Equal(models.StateNodeModel{
BlockNumber: "1",
HeaderID: cids7[0].StateNodes[0].HeaderID,
NodeType: 2,
StateKey: common.BytesToHash(test_helpers.AccountLeafKey).Hex(),
CID: test_helpers.State2CID.String(),
MhKey: test_helpers.State2MhKey,
Path: []byte{'\x0c'},
}))
_, empty, err = retriever.Retrieve(rctTopicsAndAddressFilterFail, 1)
Expect(err).ToNot(HaveOccurred())
Expect(empty).To(BeTrue())
})
})
Describe("RetrieveFirstBlockNumber", func() {

View File

@ -28,24 +28,9 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/lib/pq"
)
const (
RetrieveHeadersByHashesPgStr = `SELECT cid, data
FROM eth.header_cids
INNER JOIN public.blocks ON (
header_cids.mh_key = blocks.key
AND header_cids.block_number = blocks.block_number
)
WHERE block_hash = ANY($1::VARCHAR(66)[])`
RetrieveHeadersByBlockNumberPgStr = `SELECT cid, data
FROM eth.header_cids
INNER JOIN public.blocks ON (
header_cids.mh_key = blocks.key
AND header_cids.block_number = blocks.block_number
)
WHERE header_cids.block_number = $1`
RetrieveHeaderByHashPgStr = `SELECT cid, data
FROM eth.header_cids
INNER JOIN public.blocks ON (
@ -53,13 +38,6 @@ const (
AND header_cids.block_number = blocks.block_number
)
WHERE block_hash = $1`
RetrieveUnclesByHashesPgStr = `SELECT cid, data
FROM eth.uncle_cids
INNER JOIN public.blocks ON (
uncle_cids.mh_key = blocks.key
AND uncle_cids.block_number = blocks.block_number
)
WHERE block_hash = ANY($1::VARCHAR(66)[])`
RetrieveUnclesPgStr = `SELECT uncle_cids.cid, data
FROM eth.uncle_cids
INNER JOIN eth.header_cids ON (
@ -85,31 +63,6 @@ const (
)
WHERE header_cids.block_hash = $1
ORDER BY uncle_cids.parent_hash`
RetrieveUnclesByBlockNumberPgStr = `SELECT uncle_cids.cid, data
FROM eth.uncle_cids
INNER JOIN eth.header_cids ON (
uncle_cids.header_id = header_cids.block_hash
AND uncle_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
uncle_cids.mh_key = blocks.key
AND uncle_cids.block_number = blocks.block_number
)
WHERE header_cids.block_number = $1`
RetrieveUncleByHashPgStr = `SELECT cid, data
FROM eth.uncle_cids
INNER JOIN public.blocks ON (
uncle_cids.mh_key = blocks.key
AND uncle_cids.block_number = blocks.block_number
)
WHERE block_hash = $1`
RetrieveTransactionsByHashesPgStr = `SELECT DISTINCT ON (tx_hash) cid, data
FROM eth.transaction_cids
INNER JOIN public.blocks ON (
transaction_cids.mh_key = blocks.key
AND transaction_cids.block_number = blocks.block_number
)
WHERE tx_hash = ANY($1::VARCHAR(66)[])`
RetrieveTransactionsPgStr = `SELECT transaction_cids.cid, data
FROM eth.transaction_cids
INNER JOIN eth.header_cids ON (
@ -135,39 +88,6 @@ const (
)
WHERE block_hash = $1
ORDER BY eth.transaction_cids.index ASC`
RetrieveTransactionsByBlockNumberPgStr = `SELECT transaction_cids.cid, data
FROM eth.transaction_cids
INNER JOIN eth.header_cids ON (
transaction_cids.header_id = header_cids.block_hash
AND transaction_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
transaction_cids.mh_key = blocks.key
AND transaction_cids.block_number = blocks.block_number
)
WHERE header_cids.block_number = $1
AND block_hash = (SELECT canonical_header_hash(header_cids.block_number))
ORDER BY eth.transaction_cids.index ASC`
RetrieveTransactionByHashPgStr = `SELECT DISTINCT ON (tx_hash) cid, data
FROM eth.transaction_cids
INNER JOIN public.blocks ON (
transaction_cids.mh_key = blocks.key
AND transaction_cids.block_number = blocks.block_number
)
WHERE tx_hash = $1`
RetrieveReceiptsByTxHashesPgStr = `SELECT receipt_cids.leaf_cid, data
FROM eth.receipt_cids
INNER JOIN eth.transaction_cids ON (
receipt_cids.tx_id = transaction_cids.tx_hash
AND receipt_cids.header_id = transaction_cids.header_id
AND receipt_cids.block_number = transaction_cids.block_number
)
INNER JOIN public.blocks ON (
receipt_cids.leaf_mh_key = blocks.key
AND receipt_cids.block_number = blocks.block_number
)
WHERE tx_hash = ANY($1::VARCHAR(66)[])
AND transaction_cids.header_id = (SELECT canonical_header_hash(transaction_cids.block_number))`
RetrieveReceiptsPgStr = `SELECT receipt_cids.leaf_cid, data, eth.transaction_cids.tx_hash
FROM eth.receipt_cids
INNER JOIN eth.transaction_cids ON (
@ -203,37 +123,6 @@ const (
)
WHERE block_hash = $1
ORDER BY eth.transaction_cids.index ASC`
RetrieveReceiptsByBlockNumberPgStr = `SELECT receipt_cids.leaf_cid, data
FROM eth.receipt_cids
INNER JOIN eth.transaction_cids ON (
receipt_cids.tx_id = transaction_cids.tx_hash
AND receipt_cids.header_id = transaction_cids.header_id
AND receipt_cids.block_number = transaction_cids.block_number
)
INNER JOIN eth.header_cids ON (
transaction_cids.header_id = header_cids.block_hash
AND transaction_cids.block_number = header_cids.block_number
)
INNER JOIN public.blocks ON (
receipt_cids.leaf_mh_key = blocks.key
AND receipt_cids.block_number = blocks.block_number
)
WHERE header_cids.block_number = $1
AND block_hash = (SELECT canonical_header_hash(header_cids.block_number))
ORDER BY eth.transaction_cids.index ASC`
RetrieveReceiptByTxHashPgStr = `SELECT receipt_cids.leaf_cid, data
FROM eth.receipt_cids
INNER JOIN eth.transaction_cids ON (
receipt_cids.tx_id = transaction_cids.tx_hash
AND receipt_cids.header_id = transaction_cids.header_id
AND receipt_cids.block_number = transaction_cids.block_number
)
INNER JOIN public.blocks ON (
receipt_cids.leaf_mh_key = blocks.key
AND receipt_cids.block_number = blocks.block_number
)
WHERE tx_hash = $1
AND transaction_cids.header_id = (SELECT canonical_header_hash(transaction_cids.block_number))`
RetrieveAccountByLeafKeyAndBlockHashPgStr = `SELECT state_cids.cid, state_cids.mh_key, state_cids.block_number, state_cids.node_type
FROM eth.state_cids
INNER JOIN eth.header_cids ON (
@ -247,17 +136,6 @@ const (
AND header_cids.block_hash = (SELECT canonical_header_hash(header_cids.block_number))
ORDER BY header_cids.block_number DESC
LIMIT 1`
RetrieveAccountByLeafKeyAndBlockNumberPgStr = `SELECT state_cids.cid, state_cids.mh_key, state_cids.node_type
FROM eth.state_cids
INNER JOIN eth.header_cids ON (
state_cids.header_id = header_cids.block_hash
AND state_cids.block_number = header_cids.block_number
)
WHERE state_leaf_key = $1
AND header_cids.block_number <= $2
ORDER BY header_cids.block_number DESC
LIMIT 1`
RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockNumberPgStr = `SELECT cid, mh_key, block_number, node_type, state_leaf_removed FROM get_storage_at_by_number($1, $2, $3)`
RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockHashPgStr = `SELECT cid, mh_key, block_number, node_type, state_leaf_removed FROM get_storage_at_by_hash($1, $2, $3)`
)
@ -285,66 +163,12 @@ func NewIPLDRetriever(db *sqlx.DB) *IPLDRetriever {
}
}
// RetrieveHeadersByHashes returns the cids and rlp bytes for the headers corresponding to the provided block hashes
func (r *IPLDRetriever) RetrieveHeadersByHashes(hashes []common.Hash) ([]string, [][]byte, error) {
headerResults := make([]ipldResult, 0)
hashStrs := make([]string, len(hashes))
for i, hash := range hashes {
hashStrs[i] = hash.Hex()
}
if err := r.db.Select(&headerResults, RetrieveHeadersByHashesPgStr, pq.Array(hashStrs)); err != nil {
return nil, nil, err
}
cids := make([]string, len(headerResults))
headers := make([][]byte, len(headerResults))
for i, res := range headerResults {
cids[i] = res.CID
headers[i] = res.Data
}
return cids, headers, nil
}
// RetrieveHeadersByBlockNumber returns the cids and rlp bytes for the headers corresponding to the provided block number
// This can return more than one result since there can be more than one header (non-canonical headers)
func (r *IPLDRetriever) RetrieveHeadersByBlockNumber(number uint64) ([]string, [][]byte, error) {
headerResults := make([]ipldResult, 0)
if err := r.db.Select(&headerResults, RetrieveHeadersByBlockNumberPgStr, number); err != nil {
return nil, nil, err
}
cids := make([]string, len(headerResults))
headers := make([][]byte, len(headerResults))
for i, res := range headerResults {
cids[i] = res.CID
headers[i] = res.Data
}
return cids, headers, nil
}
// RetrieveHeaderByHash returns the cid and rlp bytes for the header corresponding to the provided block hash
func (r *IPLDRetriever) RetrieveHeaderByHash(tx *sqlx.Tx, hash common.Hash) (string, []byte, error) {
headerResult := new(ipldResult)
return headerResult.CID, headerResult.Data, tx.Get(headerResult, RetrieveHeaderByHashPgStr, hash.Hex())
}
// RetrieveUnclesByHashes returns the cids and rlp bytes for the uncles corresponding to the provided uncle hashes
func (r *IPLDRetriever) RetrieveUnclesByHashes(hashes []common.Hash) ([]string, [][]byte, error) {
uncleResults := make([]ipldResult, 0)
hashStrs := make([]string, len(hashes))
for i, hash := range hashes {
hashStrs[i] = hash.Hex()
}
if err := r.db.Select(&uncleResults, RetrieveUnclesByHashesPgStr, pq.Array(hashStrs)); err != nil {
return nil, nil, err
}
cids := make([]string, len(uncleResults))
uncles := make([][]byte, len(uncleResults))
for i, res := range uncleResults {
cids[i] = res.CID
uncles[i] = res.Data
}
return cids, uncles, nil
}
// RetrieveUncles returns the cids and rlp bytes for the uncles corresponding to the provided block hash, number (of non-omner root block)
func (r *IPLDRetriever) RetrieveUncles(tx *sqlx.Tx, hash common.Hash, number uint64) ([]string, [][]byte, error) {
uncleResults := make([]ipldResult, 0)
@ -375,46 +199,6 @@ func (r *IPLDRetriever) RetrieveUnclesByBlockHash(tx *sqlx.Tx, hash common.Hash)
return cids, uncles, nil
}
// RetrieveUnclesByBlockNumber returns the cids and rlp bytes for the uncles corresponding to the provided block number (of non-omner root block)
func (r *IPLDRetriever) RetrieveUnclesByBlockNumber(number uint64) ([]string, [][]byte, error) {
uncleResults := make([]ipldResult, 0)
if err := r.db.Select(&uncleResults, RetrieveUnclesByBlockNumberPgStr, number); err != nil {
return nil, nil, err
}
cids := make([]string, len(uncleResults))
uncles := make([][]byte, len(uncleResults))
for i, res := range uncleResults {
cids[i] = res.CID
uncles[i] = res.Data
}
return cids, uncles, nil
}
// RetrieveUncleByHash returns the cid and rlp bytes for the uncle corresponding to the provided uncle hash
func (r *IPLDRetriever) RetrieveUncleByHash(hash common.Hash) (string, []byte, error) {
uncleResult := new(ipldResult)
return uncleResult.CID, uncleResult.Data, r.db.Get(uncleResult, RetrieveUncleByHashPgStr, hash.Hex())
}
// RetrieveTransactionsByHashes returns the cids and rlp bytes for the transactions corresponding to the provided tx hashes
func (r *IPLDRetriever) RetrieveTransactionsByHashes(hashes []common.Hash) ([]string, [][]byte, error) {
txResults := make([]ipldResult, 0)
hashStrs := make([]string, len(hashes))
for i, hash := range hashes {
hashStrs[i] = hash.Hex()
}
if err := r.db.Select(&txResults, RetrieveTransactionsByHashesPgStr, pq.Array(hashStrs)); err != nil {
return nil, nil, err
}
cids := make([]string, len(txResults))
txs := make([][]byte, len(txResults))
for i, res := range txResults {
cids[i] = res.CID
txs[i] = res.Data
}
return cids, txs, nil
}
// RetrieveTransactions returns the cids and rlp bytes for the transactions corresponding to the provided block hash, number
func (r *IPLDRetriever) RetrieveTransactions(tx *sqlx.Tx, hash common.Hash, number uint64) ([]string, [][]byte, error) {
txResults := make([]ipldResult, 0)
@ -445,27 +229,6 @@ func (r *IPLDRetriever) RetrieveTransactionsByBlockHash(tx *sqlx.Tx, hash common
return cids, txs, nil
}
// RetrieveTransactionsByBlockNumber returns the cids and rlp bytes for the transactions corresponding to the provided block number
func (r *IPLDRetriever) RetrieveTransactionsByBlockNumber(number uint64) ([]string, [][]byte, error) {
txResults := make([]ipldResult, 0)
if err := r.db.Select(&txResults, RetrieveTransactionsByBlockNumberPgStr, number); err != nil {
return nil, nil, err
}
cids := make([]string, len(txResults))
txs := make([][]byte, len(txResults))
for i, res := range txResults {
cids[i] = res.CID
txs[i] = res.Data
}
return cids, txs, nil
}
// RetrieveTransactionByTxHash returns the cid and rlp bytes for the transaction corresponding to the provided tx hash
func (r *IPLDRetriever) RetrieveTransactionByTxHash(hash common.Hash) (string, []byte, error) {
txResult := new(ipldResult)
return txResult.CID, txResult.Data, r.db.Get(txResult, RetrieveTransactionByHashPgStr, hash.Hex())
}
// DecodeLeafNode decodes the leaf node data
func DecodeLeafNode(node []byte) ([]byte, error) {
var nodeElements []interface{}
@ -483,29 +246,6 @@ func DecodeLeafNode(node []byte) ([]byte, error) {
return nodeElements[1].([]byte), nil
}
// RetrieveReceiptsByTxHashes returns the cids and rlp bytes for the receipts corresponding to the provided tx hashes
func (r *IPLDRetriever) RetrieveReceiptsByTxHashes(hashes []common.Hash) ([]string, [][]byte, error) {
rctResults := make([]rctIpldResult, 0)
hashStrs := make([]string, len(hashes))
for i, hash := range hashes {
hashStrs[i] = hash.Hex()
}
if err := r.db.Select(&rctResults, RetrieveReceiptsByTxHashesPgStr, pq.Array(hashStrs)); err != nil {
return nil, nil, err
}
cids := make([]string, len(rctResults))
rcts := make([][]byte, len(rctResults))
for i, res := range rctResults {
cids[i] = res.LeafCID
nodeVal, err := DecodeLeafNode(res.Data)
if err != nil {
return nil, nil, err
}
rcts[i] = nodeVal
}
return cids, rcts, nil
}
// RetrieveReceipts returns the cids and rlp bytes for the receipts corresponding to the provided block hash, number.
// cid returned corresponds to the leaf node data which contains the receipt.
func (r *IPLDRetriever) RetrieveReceipts(tx *sqlx.Tx, hash common.Hash, number uint64) ([]string, [][]byte, []common.Hash, error) {
@ -554,41 +294,6 @@ func (r *IPLDRetriever) RetrieveReceiptsByBlockHash(tx *sqlx.Tx, hash common.Has
return cids, rcts, txs, nil
}
// RetrieveReceiptsByBlockNumber returns the cids and rlp bytes for the receipts corresponding to the provided block hash.
// cid returned corresponds to the leaf node data which contains the receipt.
func (r *IPLDRetriever) RetrieveReceiptsByBlockNumber(number uint64) ([]string, [][]byte, error) {
rctResults := make([]rctIpldResult, 0)
if err := r.db.Select(&rctResults, RetrieveReceiptsByBlockNumberPgStr, number); err != nil {
return nil, nil, err
}
cids := make([]string, len(rctResults))
rcts := make([][]byte, len(rctResults))
for i, res := range rctResults {
cids[i] = res.LeafCID
nodeVal, err := DecodeLeafNode(res.Data)
if err != nil {
return nil, nil, err
}
rcts[i] = nodeVal
}
return cids, rcts, nil
}
// RetrieveReceiptByHash returns the cid and rlp bytes for the receipt corresponding to the provided tx hash.
// cid returned corresponds to the leaf node data which contains the receipt.
func (r *IPLDRetriever) RetrieveReceiptByHash(hash common.Hash) (string, []byte, error) {
rctResult := new(rctIpldResult)
if err := r.db.Select(&rctResult, RetrieveReceiptByTxHashPgStr, hash.Hex()); err != nil {
return "", nil, err
}
nodeVal, err := DecodeLeafNode(rctResult.Data)
if err != nil {
return "", nil, err
}
return rctResult.LeafCID, nodeVal, nil
}
type nodeInfo struct {
CID string `db:"cid"`
MhKey string `db:"mh_key"`
@ -630,35 +335,6 @@ func (r *IPLDRetriever) RetrieveAccountByAddressAndBlockHash(address common.Addr
return accountResult.CID, i[1].([]byte), nil
}
// RetrieveAccountByAddressAndBlockNumber returns the cid and rlp bytes for the account corresponding to the provided address and block number
// This can return a non-canonical account
func (r *IPLDRetriever) RetrieveAccountByAddressAndBlockNumber(address common.Address, number uint64) (string, []byte, error) {
accountResult := new(nodeInfo)
leafKey := crypto.Keccak256Hash(address.Bytes())
if err := r.db.Get(accountResult, RetrieveAccountByLeafKeyAndBlockNumberPgStr, leafKey.Hex(), number); err != nil {
return "", nil, err
}
if accountResult.NodeType == sdtypes.Removed.Int() {
return "", EmptyNodeValue, nil
}
var err error
accountResult.Data, err = shared.FetchIPLD(r.db, accountResult.MhKey, number)
if err != nil {
return "", nil, err
}
var i []interface{}
if err := rlp.DecodeBytes(accountResult.Data, &i); err != nil {
return "", nil, fmt.Errorf("error decoding state leaf node rlp: %s", err.Error())
}
if len(i) != 2 {
return "", nil, fmt.Errorf("eth IPLDRetriever expected state leaf node rlp to decode into two elements")
}
return accountResult.CID, i[1].([]byte), nil
}
// RetrieveStorageAtByAddressAndStorageSlotAndBlockHash returns the cid and rlp bytes for the storage value corresponding to the provided address, storage slot, and block hash
func (r *IPLDRetriever) RetrieveStorageAtByAddressAndStorageSlotAndBlockHash(address common.Address, key, hash common.Hash) (string, []byte, []byte, error) {
storageResult := new(nodeInfo)
@ -690,32 +366,3 @@ func (r *IPLDRetriever) RetrieveStorageAtByAddressAndStorageSlotAndBlockHash(add
}
return storageResult.CID, storageResult.Data, i[1].([]byte), nil
}
// RetrieveStorageAtByAddressAndStorageKeyAndBlockNumber returns the cid and rlp bytes for the storage value corresponding to the provided address, storage key, and block number
// This can retrun a non-canonical value
func (r *IPLDRetriever) RetrieveStorageAtByAddressAndStorageKeyAndBlockNumber(address common.Address, storageLeafKey common.Hash, number uint64) (string, []byte, error) {
storageResult := new(nodeInfo)
stateLeafKey := crypto.Keccak256Hash(address.Bytes())
if err := r.db.Get(storageResult, RetrieveStorageLeafByAddressHashAndLeafKeyAndBlockNumberPgStr, stateLeafKey.Hex(), storageLeafKey.Hex(), number); err != nil {
return "", nil, err
}
if storageResult.StateLeafRemoved || storageResult.NodeType == sdtypes.Removed.Int() {
return "", EmptyNodeValue, nil
}
var err error
storageResult.Data, err = shared.FetchIPLD(r.db, storageResult.MhKey, number)
if err != nil {
return "", nil, err
}
var i []interface{}
if err := rlp.DecodeBytes(storageResult.Data, &i); err != nil {
return "", nil, fmt.Errorf("error decoding storage leaf node rlp: %s", err.Error())
}
if len(i) != 2 {
return "", nil, fmt.Errorf("eth IPLDRetriever expected storage leaf node rlp to decode into two elements")
}
return storageResult.CID, i[1].([]byte), nil
}

View File

@ -16,38 +16,6 @@
package eth
import (
"math/big"
"github.com/spf13/viper"
)
// SubscriptionSettings config is used by a subscriber to specify what eth data to stream from the watcher
type SubscriptionSettings struct {
BackFill bool
BackFillOnly bool
Start *big.Int
End *big.Int // set to 0 or a negative value to have no ending block
HeaderFilter HeaderFilter
TxFilter TxFilter
ReceiptFilter ReceiptFilter
StateFilter StateFilter
StorageFilter StorageFilter
}
// HeaderFilter contains filter settings for headers
type HeaderFilter struct {
Off bool
Uncles bool
}
// TxFilter contains filter settings for txs
type TxFilter struct {
Off bool
Src []string
Dst []string
}
// ReceiptFilter contains filter settings for receipts
type ReceiptFilter struct {
Off bool
@ -56,70 +24,3 @@ type ReceiptFilter struct {
LogAddresses []string // receipt contains logs from the provided addresses
Topics [][]string
}
// StateFilter contains filter settings for state
type StateFilter struct {
Off bool
Addresses []string // is converted to state key by taking its keccak256 hash
IntermediateNodes bool
}
// StorageFilter contains filter settings for storage
type StorageFilter struct {
Off bool
Addresses []string
StorageKeys []string // need to be the hashs key themselves not slot position
IntermediateNodes bool
}
// Init is used to initialize a EthSubscription struct with env variables
func NewEthSubscriptionConfig() (*SubscriptionSettings, error) {
sc := new(SubscriptionSettings)
// Below default to false, which means we do not backfill by default
sc.BackFill = viper.GetBool("watcher.ethSubscription.historicalData")
sc.BackFillOnly = viper.GetBool("watcher.ethSubscription.historicalDataOnly")
// Below default to 0
// 0 start means we start at the beginning and 0 end means we continue indefinitely
sc.Start = big.NewInt(viper.GetInt64("watcher.ethSubscription.startingBlock"))
sc.End = big.NewInt(viper.GetInt64("watcher.ethSubscription.endingBlock"))
// Below default to false, which means we get all headers and no uncles by default
sc.HeaderFilter = HeaderFilter{
Off: viper.GetBool("watcher.ethSubscription.headerFilter.off"),
Uncles: viper.GetBool("watcher.ethSubscription.headerFilter.uncles"),
}
// Below defaults to false and two slices of length 0
// Which means we get all transactions by default
sc.TxFilter = TxFilter{
Off: viper.GetBool("watcher.ethSubscription.txFilter.off"),
Src: viper.GetStringSlice("watcher.ethSubscription.txFilter.src"),
Dst: viper.GetStringSlice("watcher.ethSubscription.txFilter.dst"),
}
// By default all of the topic slices will be empty => match on any/all topics
topics := make([][]string, 4)
topics[0] = viper.GetStringSlice("watcher.ethSubscription.receiptFilter.topic0s")
topics[1] = viper.GetStringSlice("watcher.ethSubscription.receiptFilter.topic1s")
topics[2] = viper.GetStringSlice("watcher.ethSubscription.receiptFilter.topic2s")
topics[3] = viper.GetStringSlice("watcher.ethSubscription.receiptFilter.topic3s")
sc.ReceiptFilter = ReceiptFilter{
Off: viper.GetBool("watcher.ethSubscription.receiptFilter.off"),
MatchTxs: viper.GetBool("watcher.ethSubscription.receiptFilter.matchTxs"),
LogAddresses: viper.GetStringSlice("watcher.ethSubscription.receiptFilter.contracts"),
Topics: topics,
}
// Below defaults to two false, and a slice of length 0
// Which means we get all state leafs by default, but no intermediate nodes
sc.StateFilter = StateFilter{
Off: viper.GetBool("watcher.ethSubscription.stateFilter.off"),
IntermediateNodes: viper.GetBool("watcher.ethSubscription.stateFilter.intermediateNodes"),
Addresses: viper.GetStringSlice("watcher.ethSubscription.stateFilter.addresses"),
}
// Below defaults to two false, and two slices of length 0
// Which means we get all storage leafs by default, but no intermediate nodes
sc.StorageFilter = StorageFilter{
Off: viper.GetBool("watcher.ethSubscription.storageFilter.off"),
IntermediateNodes: viper.GetBool("watcher.ethSubscription.storageFilter.intermediateNodes"),
Addresses: viper.GetStringSlice("watcher.ethSubscription.storageFilter.addresses"),
StorageKeys: viper.GetStringSlice("watcher.ethSubscription.storageFilter.storageKeys"),
}
return sc, nil
}

View File

@ -17,6 +17,7 @@
package test_helpers
import (
"bytes"
"math/big"
"github.com/ethereum/go-ethereum/common"
@ -27,7 +28,10 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/test_helpers"
"github.com/ipfs/go-cid"
)
// Test variables
@ -106,3 +110,40 @@ func TestChainGen(i int, block *core.BlockGen) {
block.AddTx(tx)
}
}
// GetRctLeafNodeData converts the receipts to receipt trie and returns the receipt leaf node IPLD data and
// corresponding CIDs
func GetRctLeafNodeData(rcts types.Receipts) ([]cid.Cid, [][]byte, error) {
receiptTrie := ipld.NewRctTrie()
for idx, rct := range rcts {
ethRct, err := ipld.NewReceipt(rct)
if err != nil {
return nil, nil, err
}
if err = receiptTrie.Add(idx, ethRct.RawData()); err != nil {
return nil, nil, err
}
}
rctLeafNodes, keys, err := receiptTrie.GetLeafNodes()
if err != nil {
return nil, nil, err
}
ethRctleafNodeCids := make([]cid.Cid, len(rctLeafNodes))
ethRctleafNodeData := make([][]byte, len(rctLeafNodes))
for i, rln := range rctLeafNodes {
var idx uint
r := bytes.NewReader(keys[i].TrieKey)
err = rlp.Decode(r, &idx)
if err != nil {
return nil, nil, err
}
ethRctleafNodeCids[idx] = rln.Cid()
ethRctleafNodeData[idx] = rln.RawData()
}
return ethRctleafNodeCids, ethRctleafNodeData, nil
}

View File

@ -162,7 +162,7 @@ var (
Tx3 = GetTxnRlp(2, MockTransactions)
Tx4 = GetTxnRlp(3, MockTransactions)
rctCIDs, rctIPLDData, _ = eth.GetRctLeafNodeData(MockReceipts)
rctCIDs, rctIPLDData, _ = GetRctLeafNodeData(MockReceipts)
HeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, MockHeaderRlp, multihash.KECCAK_256)
HeaderMhKey = shared.MultihashKeyFromCID(HeaderCID)
Trx1CID, _ = ipld.RawdataToCid(ipld.MEthTx, Tx1, multihash.KECCAK_256)

View File

@ -17,13 +17,8 @@
package serve
import (
"context"
"github.com/cerc-io/ipld-eth-server/v4/pkg/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/types"
"github.com/cerc-io/ipld-eth-server/v4/pkg/eth"
)
// APIName is the namespace used for the state diffing service API
@ -46,45 +41,6 @@ func NewPublicServerAPI(w Server, client *rpc.Client) *PublicServerAPI {
}
}
// Stream is the public method to setup a subscription that fires off IPLD payloads as they are processed
func (api *PublicServerAPI) Stream(ctx context.Context, params eth.SubscriptionSettings) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
// create subscription and start waiting for stream events
rpcSub := notifier.CreateSubscription()
go func() {
// subscribe to events from the SyncPublishScreenAndServe service
payloadChannel := make(chan SubscriptionPayload, PayloadChanBufferSize)
quitChan := make(chan bool, 1)
go api.w.Subscribe(rpcSub.ID, payloadChannel, quitChan, params)
// loop and await payloads and relay them to the subscriber using notifier
for {
select {
case packet := <-payloadChannel:
if err := notifier.Notify(rpcSub.ID, packet); err != nil {
log.Error("Failed to send watcher data packet", "err", err)
api.w.Unsubscribe(rpcSub.ID)
return
}
case <-rpcSub.Err():
api.w.Unsubscribe(rpcSub.ID)
return
case <-quitChan:
// don't need to unsubscribe from the watcher, the service does so before sending the quit signal this way
return
}
}
}()
return rpcSub, nil
}
// WatchAddress makes a geth WatchAddress API call with the given operation and args
func (api *PublicServerAPI) WatchAddress(operation types.OperationType, args []types.WatchAddressArg) error {
err := api.rpc.Call(nil, "statediff_watchAddress", operation, args)

View File

@ -17,19 +17,15 @@
package serve
import (
"fmt"
"strconv"
"sync"
"time"
"github.com/cerc-io/ipld-eth-server/v4/pkg/log"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/tracers"
ethnode "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/jmoiron/sqlx"
@ -51,11 +47,7 @@ type Server interface {
APIs() []rpc.API
Protocols() []p2p.Protocol
// Pub-Sub handling event loop
Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth.ConvertedPayload)
// Method to subscribe to the service
Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings)
// Method to unsubscribe from the service
Unsubscribe(id rpc.ID)
Serve(wg *sync.WaitGroup)
// Backend exposes the server's backend
Backend() *eth.Backend
}
@ -64,22 +56,10 @@ type Server interface {
type Service struct {
// Used to sync access to the Subscriptions
sync.Mutex
// Interface for filtering and serving data according to subscribed clients according to their specification
Filterer eth.Filterer
// Interface for fetching IPLD objects from IPFS
IPLDFetcher eth.Fetcher
// Interface for searching and retrieving CIDs from Postgres index
Retriever eth.Retriever
// Used to signal shutdown of the service
QuitChan chan bool
// A mapping of rpc.IDs to their subscription channels, mapped to their subscription type (hash of the StreamFilters)
Subscriptions map[common.Hash]map[rpc.ID]Subscription
// A mapping of subscription params hash to the corresponding subscription params
SubscriptionTypes map[common.Hash]eth.SubscriptionSettings
// Underlying db
db *sqlx.DB
// wg for syncing serve processes
serveWg *sync.WaitGroup
// rpc client for forwarding cache misses
client *rpc.Client
// whether the proxied client supports state diffing
@ -101,13 +81,8 @@ type Service struct {
// NewServer creates a new Server using an underlying Service struct
func NewServer(settings *Config) (Server, error) {
sap := new(Service)
sap.Retriever = eth.NewCIDRetriever(settings.DB)
sap.IPLDFetcher = eth.NewIPLDFetcher(settings.DB)
sap.Filterer = eth.NewResponseFilterer()
sap.db = settings.DB
sap.QuitChan = make(chan bool)
sap.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
sap.SubscriptionTypes = make(map[common.Hash]eth.SubscriptionSettings)
sap.client = settings.Client
sap.supportsStateDiffing = settings.SupportStateDiff
sap.stateDiffTimeout = settings.StateDiffTimeout
@ -177,200 +152,22 @@ func (sap *Service) APIs() []rpc.API {
// It filters and sends this data to any subscribers to the service
// This process can also be stood up alone, without an screenAndServePayload attached to a Sync process
// and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only
func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth.ConvertedPayload) {
sap.serveWg = wg
func (sap *Service) Serve(wg *sync.WaitGroup) {
go func() {
wg.Add(1)
defer wg.Done()
for {
select {
case payload := <-screenAndServePayload:
sap.filterAndServe(payload)
case <-sap.QuitChan:
<-sap.QuitChan
log.Info("quiting eth ipld server process")
return
}
}
}()
log.Info("eth ipld server process successfully spun up")
}
// filterAndServe filters the payload according to each subscription type and sends to the subscriptions
func (sap *Service) filterAndServe(payload eth.ConvertedPayload) {
log.Debug("sending eth ipld payload to subscriptions")
sap.Lock()
sap.serveWg.Add(1)
defer sap.Unlock()
defer sap.serveWg.Done()
for ty, subs := range sap.Subscriptions {
// Retrieve the subscription parameters for this subscription type
subConfig, ok := sap.SubscriptionTypes[ty]
if !ok {
log.Errorf("eth ipld server subscription configuration for subscription type %s not available", ty.Hex())
sap.closeType(ty)
continue
}
if subConfig.End.Int64() > 0 && subConfig.End.Int64() < payload.Block.Number().Int64() {
// We are not out of range for this subscription type
// close it, and continue to the next
sap.closeType(ty)
continue
}
response, err := sap.Filterer.Filter(subConfig, payload)
if err != nil {
log.Errorf("eth ipld server filtering error: %v", err)
sap.closeType(ty)
continue
}
responseRLP, err := rlp.EncodeToBytes(response)
if err != nil {
log.Errorf("eth ipld server rlp encoding error: %v", err)
continue
}
for id, sub := range subs {
select {
case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.BlockNumber.Int64()}:
log.Debugf("sending eth ipld server payload to subscription %s", id)
default:
log.Infof("unable to send eth ipld payload to subscription %s; channel has no receiver", id)
}
}
}
}
// Subscribe is used by the API to remotely subscribe to the service loop
// The params must be rlp serializable and satisfy the SubscriptionSettings() interface
func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings) {
sap.serveWg.Add(1)
defer sap.serveWg.Done()
log.Infof("new eth ipld subscription %s", id)
subscription := Subscription{
ID: id,
PayloadChan: sub,
QuitChan: quitChan,
}
// Subscription type is defined as the hash of the rlp-serialized subscription settings
by, err := rlp.EncodeToBytes(params)
if err != nil {
sendNonBlockingErr(subscription, err)
sendNonBlockingQuit(subscription)
return
}
subscriptionType := crypto.Keccak256Hash(by)
if !params.BackFillOnly {
// Add subscriber
sap.Lock()
if sap.Subscriptions[subscriptionType] == nil {
sap.Subscriptions[subscriptionType] = make(map[rpc.ID]Subscription)
}
sap.Subscriptions[subscriptionType][id] = subscription
sap.SubscriptionTypes[subscriptionType] = params
sap.Unlock()
}
// If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data
// Otherwise we only filter new data as it is streamed in from the state diffing geth node
if params.BackFill || params.BackFillOnly {
if err := sap.sendHistoricalData(subscription, id, params); err != nil {
sendNonBlockingErr(subscription, fmt.Errorf("eth ipld server subscription backfill error: %v", err))
sendNonBlockingQuit(subscription)
return
}
}
}
// sendHistoricalData sends historical data to the requesting subscription
func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params eth.SubscriptionSettings) error {
log.Infof("sending eth ipld historical data to subscription %s", id)
// Retrieve cached CIDs relevant to this subscriber
var endingBlock int64
var startingBlock int64
var err error
startingBlock, err = sap.Retriever.RetrieveFirstBlockNumber()
if err != nil {
return err
}
if startingBlock < params.Start.Int64() {
startingBlock = params.Start.Int64()
}
endingBlock, err = sap.Retriever.RetrieveLastBlockNumber()
if err != nil {
return err
}
if endingBlock > params.End.Int64() && params.End.Int64() > 0 && params.End.Int64() > startingBlock {
endingBlock = params.End.Int64()
}
log.Debugf("eth ipld historical data starting block: %d", params.Start.Int64())
log.Debugf("eth ipld historical data ending block: %d", endingBlock)
go func() {
sap.serveWg.Add(1)
defer sap.serveWg.Done()
for i := startingBlock; i <= endingBlock; i++ {
select {
case <-sap.QuitChan:
log.Infof("ethereum historical data feed to subscription %s closed", id)
return
default:
}
cidWrappers, empty, err := sap.Retriever.Retrieve(params, i)
if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("eth ipld server cid retrieval error at block %d\r%s", i, err.Error()))
continue
}
if empty {
continue
}
for _, cids := range cidWrappers {
response, err := sap.IPLDFetcher.Fetch(cids)
if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("eth ipld server ipld fetching error at block %d\r%s", i, err.Error()))
continue
}
responseRLP, err := rlp.EncodeToBytes(response)
if err != nil {
log.Error(err)
continue
}
select {
case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.BlockNumber.Int64()}:
log.Debugf("eth ipld server sending historical data payload to subscription %s", id)
default:
log.Infof("eth ipld server unable to send backFill payload to subscription %s; channel has no receiver", id)
}
}
}
// when we are done backfilling send an empty payload signifying so in the msg
select {
case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}:
log.Debugf("eth ipld server sending backFill completion notice to subscription %s", id)
default:
log.Infof("eth ipld server unable to send backFill completion notice to subscription %s", id)
}
}()
return nil
}
// Unsubscribe is used by the API to remotely unsubscribe to the StateDiffingService loop
func (sap *Service) Unsubscribe(id rpc.ID) {
log.Infof("unsubscribing %s from the eth ipld server", id)
sap.Lock()
for ty := range sap.Subscriptions {
delete(sap.Subscriptions[ty], id)
if len(sap.Subscriptions[ty]) == 0 {
// If we removed the last subscription of this type, remove the subscription type outright
delete(sap.Subscriptions, ty)
delete(sap.SubscriptionTypes, ty)
}
}
sap.Unlock()
}
// Start is used to begin the service
// This is mostly just to satisfy the node.Service interface
func (sap *Service) Start() error {
log.Info("starting eth ipld server")
wg := new(sync.WaitGroup)
payloadChan := make(chan eth.ConvertedPayload, PayloadChanBufferSize)
sap.Serve(wg, payloadChan)
sap.Serve(wg)
return nil
}
@ -380,7 +177,6 @@ func (sap *Service) Stop() error {
log.Infof("stopping eth ipld server")
sap.Lock()
close(sap.QuitChan)
sap.close()
sap.Unlock()
return nil
}
@ -389,28 +185,3 @@ func (sap *Service) Stop() error {
func (sap *Service) Backend() *eth.Backend {
return sap.backend
}
// close is used to close all listening subscriptions
// close needs to be called with subscription access locked
func (sap *Service) close() {
log.Infof("closing all eth ipld server subscriptions")
for subType, subs := range sap.Subscriptions {
for _, sub := range subs {
sendNonBlockingQuit(sub)
}
delete(sap.Subscriptions, subType)
delete(sap.SubscriptionTypes, subType)
}
}
// closeType is used to close all subscriptions of given type
// closeType needs to be called with subscription access locked
func (sap *Service) closeType(subType common.Hash) {
log.Infof("closing all eth ipld server subscriptions of type %s", subType.String())
subs := sap.Subscriptions[subType]
for _, sub := range subs {
sendNonBlockingQuit(sub)
}
delete(sap.Subscriptions, subType)
delete(sap.SubscriptionTypes, subType)
}