make subscription config rlp encodable, group subs of the

same type using a hash of their config, process only once for each sub
type instead of for every sub.
This commit is contained in:
Ian Norden 2019-06-18 12:28:57 -05:00
parent 1d4d0cbc15
commit 23a21c14f3
13 changed files with 188 additions and 105 deletions

View File

@ -18,6 +18,7 @@ package cmd
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
@ -169,8 +170,8 @@ func subscriptionConfig() {
// Below default to 0 // Below default to 0
// 0 start means we start at the beginning and 0 end means we continue indefinitely // 0 start means we start at the beginning and 0 end means we continue indefinitely
StartingBlock: viper.GetInt64("subscription.startingBlock"), StartingBlock: big.NewInt(viper.GetInt64("subscription.startingBlock")),
EndingBlock: viper.GetInt64("subscription.endingBlock"), EndingBlock: big.NewInt(viper.GetInt64("subscription.endingBlock")),
// Below default to false, which means we get all headers by default // Below default to false, which means we get all headers by default
HeaderFilter: config.HeaderFilter{ HeaderFilter: config.HeaderFilter{
@ -215,7 +216,7 @@ func subscriptionConfig() {
func getRpcClient() core.RpcClient { func getRpcClient() core.RpcClient {
vulcPath := viper.GetString("subscription.path") vulcPath := viper.GetString("subscription.path")
if vulcPath == "" { if vulcPath == "" {
vulcPath = "ws://127.0.0.1:2019" // default to and try the default ws url if no path is provided vulcPath = "ws://127.0.0.1:80" // default to and try the default ws url if no path is provided
} }
rawRpcClient, err := rpc.Dial(vulcPath) rawRpcClient, err := rpc.Dial(vulcPath)
if err != nil { if err != nil {

View File

@ -48,8 +48,6 @@ func init() {
} }
func syncPublishScreenAndServe() { func syncPublishScreenAndServe() {
log.SetLevel(log.DebugLevel)
log.SetOutput(os.Stdout)
blockChain, ethClient, rpcClient := getBlockChainAndClients() blockChain, ethClient, rpcClient := getBlockChainAndClients()
db := utils.LoadPostgres(databaseConfig, blockChain.Node()) db := utils.LoadPostgres(databaseConfig, blockChain.Node())

View File

@ -1,5 +1,6 @@
# Seed node commands # Seed node commands
Another way that Vulcanizedb can serve as a caching layer for Ethereum is through the use of the `syncAndPublish` and
Vulcanizedb can act as an index for Ethereum data stored on IPFS through the use of the `syncAndPublish` and
`syncPublishScreenAndServe` commands. `syncPublishScreenAndServe` commands.
## Setup ## Setup
@ -60,7 +61,7 @@ And then run the ipfs command
Or we can use the pre-made script at `GOPATH/src/github.com/ipfs/go-ipfs/misc/utility/ipfs_postgres.sh` Or we can use the pre-made script at `GOPATH/src/github.com/ipfs/go-ipfs/misc/utility/ipfs_postgres.sh`
which has usage: which has usage:
`./ipfs_postgres.sh <IPFS_PGHOST> <IPFS_PGUSER> <IPFS_PGDATABASE>` `./ipfs_postgres.sh <IPFS_PGHOST> <IPFS_PGPORT> <IPFS_PGUSER> <IPFS_PGDATABASE>"`
and will ask us to enter the password, avoiding storing it to an ENV variable. and will ask us to enter the password, avoiding storing it to an ENV variable.

View File

@ -16,21 +16,23 @@
package config package config
import "math/big"
// Subscription config is used by a subscribing transformer to specifiy which data to receive from the seed node // Subscription config is used by a subscribing transformer to specifiy which data to receive from the seed node
type Subscription struct { type Subscription struct {
BackFill bool BackFill bool
BackFillOnly bool BackFillOnly bool
StartingBlock int64 StartingBlock *big.Int
EndingBlock int64 // set to 0 or a negative value to have no ending block EndingBlock *big.Int // set to 0 or a negative value to have no ending block
HeaderFilter HeaderFilter HeaderFilter HeaderFilter
TrxFilter TrxFilter TrxFilter TrxFilter
ReceiptFilter ReceiptFilter ReceiptFilter ReceiptFilter
StateFilter StateFilter StateFilter StateFilter
StorageFilter StorageFilter StorageFilter StorageFilter
} }
type HeaderFilter struct { type HeaderFilter struct {
Off bool Off bool
FinalOnly bool FinalOnly bool
} }
@ -56,4 +58,4 @@ type StorageFilter struct {
Addresses []string Addresses []string
StorageKeys []string StorageKeys []string
IntermediateNodes bool IntermediateNodes bool
} }

View File

@ -19,10 +19,10 @@ package ipfs
import ( import (
"context" "context"
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/config"
) )
// APIName is the namespace used for the state diffing service API // APIName is the namespace used for the state diffing service API
@ -58,7 +58,7 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S
// subscribe to events from the SyncPublishScreenAndServe service // subscribe to events from the SyncPublishScreenAndServe service
payloadChannel := make(chan ResponsePayload, payloadChanBufferSize) payloadChannel := make(chan ResponsePayload, payloadChanBufferSize)
quitChan := make(chan bool, 1) quitChan := make(chan bool, 1)
go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, &streamFilters) go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, streamFilters)
// loop and await state diff payloads and relay them to the subscriber with then notifier // loop and await state diff payloads and relay them to the subscriber with then notifier
for { for {
@ -66,17 +66,11 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S
case packet := <-payloadChannel: case packet := <-payloadChannel:
if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil { if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil {
log.Error("Failed to send state diff packet", "err", notifyErr) log.Error("Failed to send state diff packet", "err", notifyErr)
unSubErr := api.snp.Unsubscribe(rpcSub.ID) api.snp.Unsubscribe(rpcSub.ID)
if unSubErr != nil {
log.Error("Failed to unsubscribe from the state diff service", unSubErr)
}
return return
} }
case <-rpcSub.Err(): case <-rpcSub.Err():
err := api.snp.Unsubscribe(rpcSub.ID) api.snp.Unsubscribe(rpcSub.ID)
if err != nil {
log.Error("Failed to unsubscribe from the state diff service", err)
}
return return
case <-quitChan: case <-quitChan:
// don't need to unsubscribe, SyncPublishScreenAndServe service does so before sending the quit signal // don't need to unsubscribe, SyncPublishScreenAndServe service does so before sending the quit signal

View File

@ -51,7 +51,9 @@ func NewIPLDFetcher(ipfsPath string) (*EthIPLDFetcher, error) {
func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) { func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) {
log.Debug("fetching iplds") log.Debug("fetching iplds")
blocks := &IpldWrapper{ blocks := &IpldWrapper{
BlockNumber: cids.BlockNumber,
Headers: make([]blocks.Block, 0), Headers: make([]blocks.Block, 0),
Uncles: make([]blocks.Block, 0),
Transactions: make([]blocks.Block, 0), Transactions: make([]blocks.Block, 0),
Receipts: make([]blocks.Block, 0), Receipts: make([]blocks.Block, 0),
StateNodes: make(map[common.Hash]blocks.Block), StateNodes: make(map[common.Hash]blocks.Block),
@ -62,6 +64,10 @@ func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = f.fetchUncles(cids, blocks)
if err != nil {
return nil, err
}
err = f.fetchTrxs(cids, blocks) err = f.fetchTrxs(cids, blocks)
if err != nil { if err != nil {
return nil, err return nil, err
@ -101,6 +107,25 @@ func (f *EthIPLDFetcher) fetchHeaders(cids CidWrapper, blocks *IpldWrapper) erro
return nil return nil
} }
// fetchUncles fetches uncles
// It uses the f.fetchBatch method
func (f *EthIPLDFetcher) fetchUncles(cids CidWrapper, blocks *IpldWrapper) error {
log.Debug("fetching uncle iplds")
uncleCids := make([]cid.Cid, 0, len(cids.Uncles))
for _, c := range cids.Uncles {
dc, err := cid.Decode(c)
if err != nil {
return err
}
uncleCids = append(uncleCids, dc)
}
blocks.Uncles = f.fetchBatch(uncleCids)
if len(blocks.Uncles) != len(uncleCids) {
log.Errorf("ipfs fetcher: number of uncle blocks returned (%d) does not match number expected (%d)", len(blocks.Uncles), len(uncleCids))
}
return nil
}
// fetchTrxs fetches transactions // fetchTrxs fetches transactions
// It uses the f.fetchBatch method // It uses the f.fetchBatch method
func (f *EthIPLDFetcher) fetchTrxs(cids CidWrapper, blocks *IpldWrapper) error { func (f *EthIPLDFetcher) fetchTrxs(cids CidWrapper, blocks *IpldWrapper) error {

View File

@ -49,8 +49,8 @@ func AddressToKey(address common.Address) common.Hash {
return crypto.Keccak256Hash(address[:]) return crypto.Keccak256Hash(address[:])
} }
// HexToKey hashes a hex (0x leading) string // HexToKey hashes a hex (0x leading or not) string
func HexToKey(hex string) common.Hash { func HexToKey(hex string) common.Hash {
addr := common.HexToAddress(hex) addr := common.FromHex(hex)
return crypto.Keccak256Hash(addr[:]) return crypto.Keccak256Hash(addr[:])
} }

View File

@ -17,7 +17,6 @@
package ipfs package ipfs
import ( import (
"github.com/ethereum/go-ethereum/core"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/lib/pq" "github.com/lib/pq"
@ -138,7 +137,3 @@ func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeCID,
stateID, storageCID.Key, storageCID.CID, storageCID.Leaf) stateID, storageCID.Key, storageCID.CID, storageCID.Leaf)
return err return err
} }
type RepositoryError struct {
core.Message
}

View File

@ -38,6 +38,7 @@ func NewIPLDResolver() *EthIPLDResolver {
func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error) { func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error) {
response := new(ResponsePayload) response := new(ResponsePayload)
eir.resolveHeaders(ipfsBlocks.Headers, response) eir.resolveHeaders(ipfsBlocks.Headers, response)
eir.resolveUncles(ipfsBlocks.Uncles, response)
eir.resolveTransactions(ipfsBlocks.Transactions, response) eir.resolveTransactions(ipfsBlocks.Transactions, response)
eir.resolveReceipts(ipfsBlocks.Receipts, response) eir.resolveReceipts(ipfsBlocks.Receipts, response)
eir.resolveState(ipfsBlocks.StateNodes, response) eir.resolveState(ipfsBlocks.StateNodes, response)
@ -52,6 +53,13 @@ func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *Resp
} }
} }
func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *ResponsePayload) {
for _, block := range blocks {
raw := block.RawData()
response.UnclesRlp = append(response.UnclesRlp, raw)
}
}
func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *ResponsePayload) { func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *ResponsePayload) {
for _, block := range blocks { for _, block := range blocks {
raw := block.RawData() raw := block.RawData()

View File

@ -17,6 +17,8 @@
package ipfs package ipfs
import ( import (
"math/big"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/lib/pq" "github.com/lib/pq"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -54,13 +56,13 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C
log.Debug("retrieving cids") log.Debug("retrieving cids")
var endingBlock int64 var endingBlock int64
var err error var err error
if streamFilters.EndingBlock <= 0 || streamFilters.EndingBlock <= streamFilters.StartingBlock { if streamFilters.EndingBlock.Int64() <= 0 || streamFilters.EndingBlock.Int64() <= streamFilters.StartingBlock.Int64() {
endingBlock, err = ecr.GetLastBlockNumber() endingBlock, err = ecr.GetLastBlockNumber()
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
cids := make([]CidWrapper, 0, endingBlock+1-streamFilters.StartingBlock) cids := make([]CidWrapper, 0, endingBlock+1-streamFilters.StartingBlock.Int64())
tx, err := ecr.db.Beginx() tx, err := ecr.db.Beginx()
if err != nil { if err != nil {
return nil, err return nil, err
@ -69,8 +71,11 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C
log.Debug("backfill ending block:", endingBlock) log.Debug("backfill ending block:", endingBlock)
// THIS IS SUPER EXPENSIVE HAVING TO CYCLE THROUGH EACH BLOCK, NEED BETTER WAY TO FETCH CIDS // THIS IS SUPER EXPENSIVE HAVING TO CYCLE THROUGH EACH BLOCK, NEED BETTER WAY TO FETCH CIDS
// WHILE STILL MAINTAINING RELATION INFO ABOUT WHAT BLOCK THE CIDS BELONG TO // WHILE STILL MAINTAINING RELATION INFO ABOUT WHAT BLOCK THE CIDS BELONG TO
for i := streamFilters.StartingBlock; i <= endingBlock; i++ { for i := streamFilters.StartingBlock.Int64(); i <= endingBlock; i++ {
cw := CidWrapper{} cw := CidWrapper{}
cw.BlockNumber = big.NewInt(i)
// Retrieve cached header CIDs
if !streamFilters.HeaderFilter.Off { if !streamFilters.HeaderFilter.Off {
cw.Headers, err = ecr.retrieveHeaderCIDs(tx, streamFilters, i) cw.Headers, err = ecr.retrieveHeaderCIDs(tx, streamFilters, i)
if err != nil { if err != nil {
@ -78,7 +83,17 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
return nil, err return nil, err
} }
if !streamFilters.HeaderFilter.FinalOnly {
cw.Uncles, err = ecr.retrieveUncleCIDs(tx, streamFilters, i)
if err != nil {
tx.Rollback()
log.Error("header cid retrieval error")
return nil, err
}
}
} }
// Retrieve cached trx CIDs
var trxIds []int64 var trxIds []int64
if !streamFilters.TrxFilter.Off { if !streamFilters.TrxFilter.Off {
cw.Transactions, trxIds, err = ecr.retrieveTrxCIDs(tx, streamFilters, i) cw.Transactions, trxIds, err = ecr.retrieveTrxCIDs(tx, streamFilters, i)
@ -88,6 +103,8 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C
return nil, err return nil, err
} }
} }
// Retrieve cached receipt CIDs
if !streamFilters.ReceiptFilter.Off { if !streamFilters.ReceiptFilter.Off {
cw.Receipts, err = ecr.retrieveRctCIDs(tx, streamFilters, i, trxIds) cw.Receipts, err = ecr.retrieveRctCIDs(tx, streamFilters, i, trxIds)
if err != nil { if err != nil {
@ -96,6 +113,8 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C
return nil, err return nil, err
} }
} }
// Retrieve cached state CIDs
if !streamFilters.StateFilter.Off { if !streamFilters.StateFilter.Off {
cw.StateNodes, err = ecr.retrieveStateCIDs(tx, streamFilters, i) cw.StateNodes, err = ecr.retrieveStateCIDs(tx, streamFilters, i)
if err != nil { if err != nil {
@ -104,6 +123,8 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C
return nil, err return nil, err
} }
} }
// Retrieve cached storage CIDs
if !streamFilters.StorageFilter.Off { if !streamFilters.StorageFilter.Off {
cw.StorageNodes, err = ecr.retrieveStorageCIDs(tx, streamFilters, i) cw.StorageNodes, err = ecr.retrieveStorageCIDs(tx, streamFilters, i)
if err != nil { if err != nil {
@ -122,10 +143,16 @@ func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config
log.Debug("retrieving header cids for block ", blockNumber) log.Debug("retrieving header cids for block ", blockNumber)
headers := make([]string, 0) headers := make([]string, 0)
pgStr := `SELECT cid FROM header_cids pgStr := `SELECT cid FROM header_cids
WHERE block_number = $1` WHERE block_number = $1 AND final IS TRUE`
if streamFilters.HeaderFilter.FinalOnly { err := tx.Select(&headers, pgStr, blockNumber)
pgStr += ` AND final IS TRUE` return headers, err
} }
func (ecr *EthCIDRetriever) retrieveUncleCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]string, error) {
log.Debug("retrieving header cids for block ", blockNumber)
headers := make([]string, 0)
pgStr := `SELECT cid FROM header_cids
WHERE block_number = $1 AND final IS FALSE`
err := tx.Select(&headers, pgStr, blockNumber) err := tx.Select(&headers, pgStr, blockNumber)
return headers, err return headers, err
} }

View File

@ -28,7 +28,7 @@ import (
// ResponseScreener is the inteface used to screen eth data and package appropriate data into a response payload // ResponseScreener is the inteface used to screen eth data and package appropriate data into a response payload
type ResponseScreener interface { type ResponseScreener interface {
ScreenResponse(streamFilters *config.Subscription, payload IPLDPayload) (*ResponsePayload, error) ScreenResponse(streamFilters config.Subscription, payload IPLDPayload) (*ResponsePayload, error)
} }
// Screener is the underlying struct for the ReponseScreener interface // Screener is the underlying struct for the ReponseScreener interface
@ -40,7 +40,7 @@ func NewResponseScreener() *Screener {
} }
// ScreenResponse is used to filter through eth data to extract and package requested data into a ResponsePayload // ScreenResponse is used to filter through eth data to extract and package requested data into a ResponsePayload
func (s *Screener) ScreenResponse(streamFilters *config.Subscription, payload IPLDPayload) (*ResponsePayload, error) { func (s *Screener) ScreenResponse(streamFilters config.Subscription, payload IPLDPayload) (*ResponsePayload, error) {
response := new(ResponsePayload) response := new(ResponsePayload)
err := s.filterHeaders(streamFilters, response, payload) err := s.filterHeaders(streamFilters, response, payload)
if err != nil { if err != nil {
@ -62,11 +62,12 @@ func (s *Screener) ScreenResponse(streamFilters *config.Subscription, payload IP
if err != nil { if err != nil {
return nil, err return nil, err
} }
response.BlockNumber = payload.BlockNumber
return response, nil return response, nil
} }
func (s *Screener) filterHeaders(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) error { func (s *Screener) filterHeaders(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) error {
if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP) response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP)
if !streamFilters.HeaderFilter.FinalOnly { if !streamFilters.HeaderFilter.FinalOnly {
for _, uncle := range payload.BlockBody.Uncles { for _, uncle := range payload.BlockBody.Uncles {
@ -88,9 +89,9 @@ func checkRange(start, end, actual int64) bool {
return false return false
} }
func (s *Screener) filterTransactions(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) ([]common.Hash, error) { func (s *Screener) filterTransactions(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) ([]common.Hash, error) {
trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions)) trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions))
if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
for i, trx := range payload.BlockBody.Transactions { for i, trx := range payload.BlockBody.Transactions {
if checkTransactions(streamFilters.TrxFilter.Src, streamFilters.TrxFilter.Dst, payload.TrxMetaData[i].Src, payload.TrxMetaData[i].Dst) { if checkTransactions(streamFilters.TrxFilter.Src, streamFilters.TrxFilter.Dst, payload.TrxMetaData[i].Src, payload.TrxMetaData[i].Dst) {
trxBuffer := new(bytes.Buffer) trxBuffer := new(bytes.Buffer)
@ -124,8 +125,8 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin
return false return false
} }
func (s *Screener) filerReceipts(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error { func (s *Screener) filerReceipts(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error {
if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
for i, receipt := range payload.Receipts { for i, receipt := range payload.Receipts {
if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, trxHashes) { if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, trxHashes) {
receiptForStorage := (*types.ReceiptForStorage)(receipt) receiptForStorage := (*types.ReceiptForStorage)(receipt)
@ -161,9 +162,9 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics []string, want
return false return false
} }
func (s *Screener) filterState(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) error { func (s *Screener) filterState(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) error {
response.StateNodesRlp = make(map[common.Hash][]byte) if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { response.StateNodesRlp = make(map[common.Hash][]byte)
keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses)) keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses))
for _, addr := range streamFilters.StateFilter.Addresses { for _, addr := range streamFilters.StateFilter.Addresses {
keyFilter := AddressToKey(common.HexToAddress(addr)) keyFilter := AddressToKey(common.HexToAddress(addr))
@ -193,8 +194,9 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool {
return false return false
} }
func (s *Screener) filterStorage(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) error { func (s *Screener) filterStorage(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) error {
if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte)
stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses)) stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses))
for _, addr := range streamFilters.StorageFilter.Addresses { for _, addr := range streamFilters.StorageFilter.Addresses {
keyFilter := AddressToKey(common.HexToAddress(addr)) keyFilter := AddressToKey(common.HexToAddress(addr))

View File

@ -20,22 +20,24 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/vulcanize/vulcanizedb/pkg/config" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
) )
const payloadChanBufferSize = 20000 // the max eth sub buffer size const payloadChanBufferSize = 20000 // the max eth sub buffer size
// SyncPublishScreenAndServe is an interface for streaming, converting to IPLDs, publishing, // SyncPublishScreenAndServe is the top level interface for streaming, converting to IPLDs, publishing,
// indexing all Ethereum data screening this data, and serving it up to subscribed clients // and indexing all Ethereum data; screening this data; and serving it up to subscribed clients
// This service is compatible with the Ethereum service interface (node.Service) // This service is compatible with the Ethereum service interface (node.Service)
type SyncPublishScreenAndServe interface { type SyncPublishScreenAndServe interface {
// APIs(), Protocols(), Start() and Stop() // APIs(), Protocols(), Start() and Stop()
@ -45,9 +47,9 @@ type SyncPublishScreenAndServe interface {
// Main event loop for handling client pub-sub // Main event loop for handling client pub-sub
ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool)
// Method to subscribe to receive state diff processing output // Method to subscribe to receive state diff processing output
Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *config.Subscription) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters config.Subscription)
// Method to unsubscribe from state diff processing // Method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID) error Unsubscribe(id rpc.ID)
} }
// Service is the underlying struct for the SyncAndPublish interface // Service is the underlying struct for the SyncAndPublish interface
@ -74,8 +76,10 @@ type Service struct {
PayloadChan chan statediff.Payload PayloadChan chan statediff.Payload
// Used to signal shutdown of the service // Used to signal shutdown of the service
QuitChan chan bool QuitChan chan bool
// A mapping of rpc.IDs to their subscription channels // A mapping of rpc.IDs to their subscription channels, mapped to their subscription type (hash of the StreamFilters)
Subscriptions map[rpc.ID]Subscription Subscriptions map[common.Hash]map[rpc.ID]Subscription
// A mapping of subscription hash type to the corresponding StreamFilters
SubscriptionTypes map[common.Hash]config.Subscription
} }
// NewIPFSProcessor creates a new Processor interface using an underlying Processor struct // NewIPFSProcessor creates a new Processor interface using an underlying Processor struct
@ -89,17 +93,18 @@ func NewIPFSProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient
return nil, err return nil, err
} }
return &Service{ return &Service{
Streamer: NewStateDiffStreamer(rpcClient), Streamer: NewStateDiffStreamer(rpcClient),
Repository: NewCIDRepository(db), Repository: NewCIDRepository(db),
Converter: NewPayloadConverter(ethClient), Converter: NewPayloadConverter(ethClient),
Publisher: publisher, Publisher: publisher,
Screener: NewResponseScreener(), Screener: NewResponseScreener(),
Fetcher: fetcher, Fetcher: fetcher,
Retriever: NewCIDRetriever(db), Retriever: NewCIDRetriever(db),
Resolver: NewIPLDResolver(), Resolver: NewIPLDResolver(),
PayloadChan: make(chan statediff.Payload, payloadChanBufferSize), PayloadChan: make(chan statediff.Payload, payloadChanBufferSize),
QuitChan: qc, QuitChan: qc,
Subscriptions: make(map[rpc.ID]Subscription), Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]config.Subscription),
}, nil }, nil
} }
@ -196,40 +201,60 @@ func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan
} }
func (sap *Service) processResponse(payload IPLDPayload) error { func (sap *Service) processResponse(payload IPLDPayload) error {
for id, sub := range sap.Subscriptions { for ty, subs := range sap.Subscriptions {
response, err := sap.Screener.ScreenResponse(sub.StreamFilters, payload) // Retreive the subscription paramaters for this subscription type
subConfig, ok := sap.SubscriptionTypes[ty]
if !ok {
return fmt.Errorf("subscription configuration for subscription type %s not available", ty.Hex())
}
response, err := sap.Screener.ScreenResponse(subConfig, payload)
if err != nil { if err != nil {
return err return err
} }
sap.serve(id, *response) for id := range subs {
//TODO send payloads to this type of sub
sap.serve(id, *response, ty)
}
} }
return nil return nil
} }
// Subscribe is used by the API to subscribe to the service loop // Subscribe is used by the API to subscribe to the service loop
func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *config.Subscription) { func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters config.Subscription) {
log.Info("Subscribing to the seed node service") log.Info("Subscribing to the seed node service")
// Subscription type is defined as the hash of its content
// Group subscriptions by type and screen payloads once for subs of the same type
by, err := rlp.EncodeToBytes(streamFilters)
if err != nil {
log.Error(err)
}
subscriptionHash := crypto.Keccak256(by)
subscriptionType := common.BytesToHash(subscriptionHash)
subscription := Subscription{ subscription := Subscription{
PayloadChan: sub, PayloadChan: sub,
QuitChan: quitChan, QuitChan: quitChan,
StreamFilters: streamFilters,
} }
// If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data // If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data
// Otherwise we only filter new data as it is streamed in from the state diffing geth node // Otherwise we only filter new data as it is streamed in from the state diffing geth node
if streamFilters.BackFill || streamFilters.BackFillOnly { if streamFilters.BackFill || streamFilters.BackFillOnly {
sap.backFill(subscription, id) sap.backFill(subscription, id, streamFilters)
} }
if !streamFilters.BackFillOnly { if !streamFilters.BackFillOnly {
sap.Lock() sap.Lock()
sap.Subscriptions[id] = subscription if sap.Subscriptions[subscriptionType] == nil {
sap.Subscriptions[subscriptionType] = make(map[rpc.ID]Subscription)
}
sap.Subscriptions[subscriptionType][id] = subscription
sap.SubscriptionTypes[subscriptionType] = streamFilters
sap.Unlock() sap.Unlock()
} }
} }
func (sap *Service) backFill(sub Subscription, id rpc.ID) { func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscription) {
log.Debug("back-filling data for id", id) log.Debug("back-filling data for id", id)
// Retrieve cached CIDs relevant to this subscriber // Retrieve cached CIDs relevant to this subscriber
cidWrappers, err := sap.Retriever.RetrieveCIDs(*sub.StreamFilters) cidWrappers, err := sap.Retriever.RetrieveCIDs(con)
if err != nil { if err != nil {
sub.PayloadChan <- ResponsePayload{ sub.PayloadChan <- ResponsePayload{
ErrMsg: "CID retrieval error: " + err.Error(), ErrMsg: "CID retrieval error: " + err.Error(),
@ -260,16 +285,18 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID) {
} }
// Unsubscribe is used to unsubscribe to the StateDiffingService loop // Unsubscribe is used to unsubscribe to the StateDiffingService loop
func (sap *Service) Unsubscribe(id rpc.ID) error { func (sap *Service) Unsubscribe(id rpc.ID) {
log.Info("Unsubscribing from the seed node service") log.Info("Unsubscribing from the seed node service")
sap.Lock() sap.Lock()
_, ok := sap.Subscriptions[id] for ty := range sap.Subscriptions {
if !ok { delete(sap.Subscriptions[ty], id)
return fmt.Errorf("cannot unsubscribe; subscription for id %s does not exist", id) if len(sap.Subscriptions[ty]) == 0 {
// If we removed the last subscription of this type, remove the subscription type outright
delete(sap.Subscriptions, ty)
delete(sap.SubscriptionTypes, ty)
}
} }
delete(sap.Subscriptions, id)
sap.Unlock() sap.Unlock()
return nil
} }
// Start is used to begin the service // Start is used to begin the service
@ -293,9 +320,9 @@ func (sap *Service) Stop() error {
} }
// serve is used to send screened payloads to their requesting sub // serve is used to send screened payloads to their requesting sub
func (sap *Service) serve(id rpc.ID, payload ResponsePayload) { func (sap *Service) serve(id rpc.ID, payload ResponsePayload, ty common.Hash) {
sap.Lock() sap.Lock()
sub, ok := sap.Subscriptions[id] sub, ok := sap.Subscriptions[ty][id]
if ok { if ok {
select { select {
case sub.PayloadChan <- payload: case sub.PayloadChan <- payload:
@ -310,14 +337,17 @@ func (sap *Service) serve(id rpc.ID, payload ResponsePayload) {
// close is used to close all listening subscriptions // close is used to close all listening subscriptions
func (sap *Service) close() { func (sap *Service) close() {
sap.Lock() sap.Lock()
for id, sub := range sap.Subscriptions { for ty, subs := range sap.Subscriptions {
select { for id, sub := range subs {
case sub.QuitChan <- true: select {
log.Infof("closing subscription %s", id) case sub.QuitChan <- true:
default: log.Infof("closing subscription %s", id)
log.Infof("unable to close subscription %s; channel has no receiver", id) default:
log.Infof("unable to close subscription %s; channel has no receiver", id)
}
} }
delete(sap.Subscriptions, id) delete(sap.Subscriptions, ty)
delete(sap.SubscriptionTypes, ty)
} }
sap.Unlock() sap.Unlock()
} }

View File

@ -20,23 +20,20 @@ import (
"encoding/json" "encoding/json"
"math/big" "math/big"
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/ipfs/go-block-format"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ipfs/go-block-format"
) )
// Subscription holds the information for an individual client subscription // Subscription holds the information for an individual client subscription
type Subscription struct { type Subscription struct {
PayloadChan chan<- ResponsePayload PayloadChan chan<- ResponsePayload
QuitChan chan<- bool QuitChan chan<- bool
StreamFilters *config.Subscription
} }
// ResponsePayload holds the data returned from the seed node to the requesting client // ResponsePayload holds the data returned from the seed node to the requesting client
type ResponsePayload struct { type ResponsePayload struct {
BlockNumber *big.Int `json:"blockNumber"`
HeadersRlp [][]byte `json:"headersRlp"` HeadersRlp [][]byte `json:"headersRlp"`
UnclesRlp [][]byte `json:"unclesRlp"` UnclesRlp [][]byte `json:"unclesRlp"`
TransactionsRlp [][]byte `json:"transactionsRlp"` TransactionsRlp [][]byte `json:"transactionsRlp"`
@ -69,8 +66,9 @@ func (sd *ResponsePayload) Encode() ([]byte, error) {
// CidWrapper is used to package CIDs retrieved from the local Postgres cache // CidWrapper is used to package CIDs retrieved from the local Postgres cache
type CidWrapper struct { type CidWrapper struct {
BlockNumber int64 BlockNumber *big.Int
Headers []string Headers []string
Uncles []string
Transactions []string Transactions []string
Receipts []string Receipts []string
StateNodes []StateNodeCID StateNodes []StateNodeCID
@ -79,7 +77,9 @@ type CidWrapper struct {
// IpldWrapper is used to package raw IPLD block data for resolution // IpldWrapper is used to package raw IPLD block data for resolution
type IpldWrapper struct { type IpldWrapper struct {
BlockNumber *big.Int
Headers []blocks.Block Headers []blocks.Block
Uncles []blocks.Block
Transactions []blocks.Block Transactions []blocks.Block
Receipts []blocks.Block Receipts []blocks.Block
StateNodes map[common.Hash]blocks.Block StateNodes map[common.Hash]blocks.Block