diff --git a/cmd/streamSubscribe.go b/cmd/streamSubscribe.go index 63059883..140952a0 100644 --- a/cmd/streamSubscribe.go +++ b/cmd/streamSubscribe.go @@ -18,6 +18,7 @@ package cmd import ( "bytes" "fmt" + "math/big" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -169,8 +170,8 @@ func subscriptionConfig() { // Below default to 0 // 0 start means we start at the beginning and 0 end means we continue indefinitely - StartingBlock: viper.GetInt64("subscription.startingBlock"), - EndingBlock: viper.GetInt64("subscription.endingBlock"), + StartingBlock: big.NewInt(viper.GetInt64("subscription.startingBlock")), + EndingBlock: big.NewInt(viper.GetInt64("subscription.endingBlock")), // Below default to false, which means we get all headers by default HeaderFilter: config.HeaderFilter{ @@ -215,7 +216,7 @@ func subscriptionConfig() { func getRpcClient() core.RpcClient { vulcPath := viper.GetString("subscription.path") if vulcPath == "" { - vulcPath = "ws://127.0.0.1:2019" // default to and try the default ws url if no path is provided + vulcPath = "ws://127.0.0.1:80" // default to and try the default ws url if no path is provided } rawRpcClient, err := rpc.Dial(vulcPath) if err != nil { diff --git a/cmd/syncPublishScreenAndServe.go b/cmd/syncPublishScreenAndServe.go index 1288a3a4..75123327 100644 --- a/cmd/syncPublishScreenAndServe.go +++ b/cmd/syncPublishScreenAndServe.go @@ -48,8 +48,6 @@ func init() { } func syncPublishScreenAndServe() { - log.SetLevel(log.DebugLevel) - log.SetOutput(os.Stdout) blockChain, ethClient, rpcClient := getBlockChainAndClients() db := utils.LoadPostgres(databaseConfig, blockChain.Node()) diff --git a/documentation/seed-node.md b/documentation/seed-node.md index db4bb4c3..37388410 100644 --- a/documentation/seed-node.md +++ b/documentation/seed-node.md @@ -1,5 +1,6 @@ # Seed node commands -Another way that Vulcanizedb can serve as a caching layer for Ethereum is through the use of the `syncAndPublish` and + +Vulcanizedb can act as an index for Ethereum data stored on IPFS through the use of the `syncAndPublish` and `syncPublishScreenAndServe` commands. ## Setup @@ -60,7 +61,7 @@ And then run the ipfs command Or we can use the pre-made script at `GOPATH/src/github.com/ipfs/go-ipfs/misc/utility/ipfs_postgres.sh` which has usage: -`./ipfs_postgres.sh ` +`./ipfs_postgres.sh "` and will ask us to enter the password, avoiding storing it to an ENV variable. diff --git a/pkg/config/subscription.go b/pkg/config/subscription.go index 2aea5d06..c304b066 100644 --- a/pkg/config/subscription.go +++ b/pkg/config/subscription.go @@ -16,21 +16,23 @@ package config +import "math/big" + // Subscription config is used by a subscribing transformer to specifiy which data to receive from the seed node type Subscription struct { BackFill bool BackFillOnly bool - StartingBlock int64 - EndingBlock int64 // set to 0 or a negative value to have no ending block + StartingBlock *big.Int + EndingBlock *big.Int // set to 0 or a negative value to have no ending block HeaderFilter HeaderFilter - TrxFilter TrxFilter + TrxFilter TrxFilter ReceiptFilter ReceiptFilter - StateFilter StateFilter + StateFilter StateFilter StorageFilter StorageFilter } type HeaderFilter struct { - Off bool + Off bool FinalOnly bool } @@ -56,4 +58,4 @@ type StorageFilter struct { Addresses []string StorageKeys []string IntermediateNodes bool -} \ No newline at end of file +} diff --git a/pkg/ipfs/api.go b/pkg/ipfs/api.go index f5169e5c..d33de181 100644 --- a/pkg/ipfs/api.go +++ b/pkg/ipfs/api.go @@ -19,10 +19,10 @@ package ipfs import ( "context" - "github.com/vulcanize/vulcanizedb/pkg/config" - - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" + log "github.com/sirupsen/logrus" + + "github.com/vulcanize/vulcanizedb/pkg/config" ) // APIName is the namespace used for the state diffing service API @@ -58,7 +58,7 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S // subscribe to events from the SyncPublishScreenAndServe service payloadChannel := make(chan ResponsePayload, payloadChanBufferSize) quitChan := make(chan bool, 1) - go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, &streamFilters) + go api.snp.Subscribe(rpcSub.ID, payloadChannel, quitChan, streamFilters) // loop and await state diff payloads and relay them to the subscriber with then notifier for { @@ -66,17 +66,11 @@ func (api *PublicSeedNodeAPI) Stream(ctx context.Context, streamFilters config.S case packet := <-payloadChannel: if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil { log.Error("Failed to send state diff packet", "err", notifyErr) - unSubErr := api.snp.Unsubscribe(rpcSub.ID) - if unSubErr != nil { - log.Error("Failed to unsubscribe from the state diff service", unSubErr) - } + api.snp.Unsubscribe(rpcSub.ID) return } case <-rpcSub.Err(): - err := api.snp.Unsubscribe(rpcSub.ID) - if err != nil { - log.Error("Failed to unsubscribe from the state diff service", err) - } + api.snp.Unsubscribe(rpcSub.ID) return case <-quitChan: // don't need to unsubscribe, SyncPublishScreenAndServe service does so before sending the quit signal diff --git a/pkg/ipfs/fetcher.go b/pkg/ipfs/fetcher.go index 1d50d1be..8a15fe55 100644 --- a/pkg/ipfs/fetcher.go +++ b/pkg/ipfs/fetcher.go @@ -51,7 +51,9 @@ func NewIPLDFetcher(ipfsPath string) (*EthIPLDFetcher, error) { func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) { log.Debug("fetching iplds") blocks := &IpldWrapper{ + BlockNumber: cids.BlockNumber, Headers: make([]blocks.Block, 0), + Uncles: make([]blocks.Block, 0), Transactions: make([]blocks.Block, 0), Receipts: make([]blocks.Block, 0), StateNodes: make(map[common.Hash]blocks.Block), @@ -62,6 +64,10 @@ func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) { if err != nil { return nil, err } + err = f.fetchUncles(cids, blocks) + if err != nil { + return nil, err + } err = f.fetchTrxs(cids, blocks) if err != nil { return nil, err @@ -101,6 +107,25 @@ func (f *EthIPLDFetcher) fetchHeaders(cids CidWrapper, blocks *IpldWrapper) erro return nil } +// fetchUncles fetches uncles +// It uses the f.fetchBatch method +func (f *EthIPLDFetcher) fetchUncles(cids CidWrapper, blocks *IpldWrapper) error { + log.Debug("fetching uncle iplds") + uncleCids := make([]cid.Cid, 0, len(cids.Uncles)) + for _, c := range cids.Uncles { + dc, err := cid.Decode(c) + if err != nil { + return err + } + uncleCids = append(uncleCids, dc) + } + blocks.Uncles = f.fetchBatch(uncleCids) + if len(blocks.Uncles) != len(uncleCids) { + log.Errorf("ipfs fetcher: number of uncle blocks returned (%d) does not match number expected (%d)", len(blocks.Uncles), len(uncleCids)) + } + return nil +} + // fetchTrxs fetches transactions // It uses the f.fetchBatch method func (f *EthIPLDFetcher) fetchTrxs(cids CidWrapper, blocks *IpldWrapper) error { diff --git a/pkg/ipfs/helpers.go b/pkg/ipfs/helpers.go index c2b8e15c..b741bda7 100644 --- a/pkg/ipfs/helpers.go +++ b/pkg/ipfs/helpers.go @@ -49,8 +49,8 @@ func AddressToKey(address common.Address) common.Hash { return crypto.Keccak256Hash(address[:]) } -// HexToKey hashes a hex (0x leading) string +// HexToKey hashes a hex (0x leading or not) string func HexToKey(hex string) common.Hash { - addr := common.HexToAddress(hex) + addr := common.FromHex(hex) return crypto.Keccak256Hash(addr[:]) } diff --git a/pkg/ipfs/repository.go b/pkg/ipfs/repository.go index 66da684d..320a3860 100644 --- a/pkg/ipfs/repository.go +++ b/pkg/ipfs/repository.go @@ -17,7 +17,6 @@ package ipfs import ( - "github.com/ethereum/go-ethereum/core" "github.com/jmoiron/sqlx" "github.com/lib/pq" @@ -138,7 +137,3 @@ func (repo *Repository) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeCID, stateID, storageCID.Key, storageCID.CID, storageCID.Leaf) return err } - -type RepositoryError struct { - core.Message -} diff --git a/pkg/ipfs/resolver.go b/pkg/ipfs/resolver.go index 90fe92fd..9e71ee90 100644 --- a/pkg/ipfs/resolver.go +++ b/pkg/ipfs/resolver.go @@ -38,6 +38,7 @@ func NewIPLDResolver() *EthIPLDResolver { func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error) { response := new(ResponsePayload) eir.resolveHeaders(ipfsBlocks.Headers, response) + eir.resolveUncles(ipfsBlocks.Uncles, response) eir.resolveTransactions(ipfsBlocks.Transactions, response) eir.resolveReceipts(ipfsBlocks.Receipts, response) eir.resolveState(ipfsBlocks.StateNodes, response) @@ -52,6 +53,13 @@ func (eir *EthIPLDResolver) resolveHeaders(blocks []blocks.Block, response *Resp } } +func (eir *EthIPLDResolver) resolveUncles(blocks []blocks.Block, response *ResponsePayload) { + for _, block := range blocks { + raw := block.RawData() + response.UnclesRlp = append(response.UnclesRlp, raw) + } +} + func (eir *EthIPLDResolver) resolveTransactions(blocks []blocks.Block, response *ResponsePayload) { for _, block := range blocks { raw := block.RawData() diff --git a/pkg/ipfs/retreiver.go b/pkg/ipfs/retreiver.go index b43c63f3..06394a0b 100644 --- a/pkg/ipfs/retreiver.go +++ b/pkg/ipfs/retreiver.go @@ -17,6 +17,8 @@ package ipfs import ( + "math/big" + "github.com/jmoiron/sqlx" "github.com/lib/pq" log "github.com/sirupsen/logrus" @@ -54,13 +56,13 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C log.Debug("retrieving cids") var endingBlock int64 var err error - if streamFilters.EndingBlock <= 0 || streamFilters.EndingBlock <= streamFilters.StartingBlock { + if streamFilters.EndingBlock.Int64() <= 0 || streamFilters.EndingBlock.Int64() <= streamFilters.StartingBlock.Int64() { endingBlock, err = ecr.GetLastBlockNumber() if err != nil { return nil, err } } - cids := make([]CidWrapper, 0, endingBlock+1-streamFilters.StartingBlock) + cids := make([]CidWrapper, 0, endingBlock+1-streamFilters.StartingBlock.Int64()) tx, err := ecr.db.Beginx() if err != nil { return nil, err @@ -69,8 +71,11 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C log.Debug("backfill ending block:", endingBlock) // THIS IS SUPER EXPENSIVE HAVING TO CYCLE THROUGH EACH BLOCK, NEED BETTER WAY TO FETCH CIDS // WHILE STILL MAINTAINING RELATION INFO ABOUT WHAT BLOCK THE CIDS BELONG TO - for i := streamFilters.StartingBlock; i <= endingBlock; i++ { + for i := streamFilters.StartingBlock.Int64(); i <= endingBlock; i++ { cw := CidWrapper{} + cw.BlockNumber = big.NewInt(i) + + // Retrieve cached header CIDs if !streamFilters.HeaderFilter.Off { cw.Headers, err = ecr.retrieveHeaderCIDs(tx, streamFilters, i) if err != nil { @@ -78,7 +83,17 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C log.Error("header cid retrieval error") return nil, err } + if !streamFilters.HeaderFilter.FinalOnly { + cw.Uncles, err = ecr.retrieveUncleCIDs(tx, streamFilters, i) + if err != nil { + tx.Rollback() + log.Error("header cid retrieval error") + return nil, err + } + } } + + // Retrieve cached trx CIDs var trxIds []int64 if !streamFilters.TrxFilter.Off { cw.Transactions, trxIds, err = ecr.retrieveTrxCIDs(tx, streamFilters, i) @@ -88,6 +103,8 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C return nil, err } } + + // Retrieve cached receipt CIDs if !streamFilters.ReceiptFilter.Off { cw.Receipts, err = ecr.retrieveRctCIDs(tx, streamFilters, i, trxIds) if err != nil { @@ -96,6 +113,8 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C return nil, err } } + + // Retrieve cached state CIDs if !streamFilters.StateFilter.Off { cw.StateNodes, err = ecr.retrieveStateCIDs(tx, streamFilters, i) if err != nil { @@ -104,6 +123,8 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C return nil, err } } + + // Retrieve cached storage CIDs if !streamFilters.StorageFilter.Off { cw.StorageNodes, err = ecr.retrieveStorageCIDs(tx, streamFilters, i) if err != nil { @@ -122,10 +143,16 @@ func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config log.Debug("retrieving header cids for block ", blockNumber) headers := make([]string, 0) pgStr := `SELECT cid FROM header_cids - WHERE block_number = $1` - if streamFilters.HeaderFilter.FinalOnly { - pgStr += ` AND final IS TRUE` - } + WHERE block_number = $1 AND final IS TRUE` + err := tx.Select(&headers, pgStr, blockNumber) + return headers, err +} + +func (ecr *EthCIDRetriever) retrieveUncleCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]string, error) { + log.Debug("retrieving header cids for block ", blockNumber) + headers := make([]string, 0) + pgStr := `SELECT cid FROM header_cids + WHERE block_number = $1 AND final IS FALSE` err := tx.Select(&headers, pgStr, blockNumber) return headers, err } diff --git a/pkg/ipfs/screener.go b/pkg/ipfs/screener.go index 98ab9030..0ddf027e 100644 --- a/pkg/ipfs/screener.go +++ b/pkg/ipfs/screener.go @@ -28,7 +28,7 @@ import ( // ResponseScreener is the inteface used to screen eth data and package appropriate data into a response payload type ResponseScreener interface { - ScreenResponse(streamFilters *config.Subscription, payload IPLDPayload) (*ResponsePayload, error) + ScreenResponse(streamFilters config.Subscription, payload IPLDPayload) (*ResponsePayload, error) } // Screener is the underlying struct for the ReponseScreener interface @@ -40,7 +40,7 @@ func NewResponseScreener() *Screener { } // ScreenResponse is used to filter through eth data to extract and package requested data into a ResponsePayload -func (s *Screener) ScreenResponse(streamFilters *config.Subscription, payload IPLDPayload) (*ResponsePayload, error) { +func (s *Screener) ScreenResponse(streamFilters config.Subscription, payload IPLDPayload) (*ResponsePayload, error) { response := new(ResponsePayload) err := s.filterHeaders(streamFilters, response, payload) if err != nil { @@ -62,11 +62,12 @@ func (s *Screener) ScreenResponse(streamFilters *config.Subscription, payload IP if err != nil { return nil, err } + response.BlockNumber = payload.BlockNumber return response, nil } -func (s *Screener) filterHeaders(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) error { - if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { +func (s *Screener) filterHeaders(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) error { + if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP) if !streamFilters.HeaderFilter.FinalOnly { for _, uncle := range payload.BlockBody.Uncles { @@ -88,9 +89,9 @@ func checkRange(start, end, actual int64) bool { return false } -func (s *Screener) filterTransactions(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) ([]common.Hash, error) { +func (s *Screener) filterTransactions(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) ([]common.Hash, error) { trxHashes := make([]common.Hash, 0, len(payload.BlockBody.Transactions)) - if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { + if !streamFilters.TrxFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { for i, trx := range payload.BlockBody.Transactions { if checkTransactions(streamFilters.TrxFilter.Src, streamFilters.TrxFilter.Dst, payload.TrxMetaData[i].Src, payload.TrxMetaData[i].Dst) { trxBuffer := new(bytes.Buffer) @@ -124,8 +125,8 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin return false } -func (s *Screener) filerReceipts(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error { - if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { +func (s *Screener) filerReceipts(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload, trxHashes []common.Hash) error { + if !streamFilters.ReceiptFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { for i, receipt := range payload.Receipts { if checkReceipts(receipt, streamFilters.ReceiptFilter.Topic0s, payload.ReceiptMetaData[i].Topic0s, trxHashes) { receiptForStorage := (*types.ReceiptForStorage)(receipt) @@ -161,9 +162,9 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics []string, want return false } -func (s *Screener) filterState(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) error { - response.StateNodesRlp = make(map[common.Hash][]byte) - if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { +func (s *Screener) filterState(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) error { + if !streamFilters.StateFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { + response.StateNodesRlp = make(map[common.Hash][]byte) keyFilters := make([]common.Hash, 0, len(streamFilters.StateFilter.Addresses)) for _, addr := range streamFilters.StateFilter.Addresses { keyFilter := AddressToKey(common.HexToAddress(addr)) @@ -193,8 +194,9 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool { return false } -func (s *Screener) filterStorage(streamFilters *config.Subscription, response *ResponsePayload, payload IPLDPayload) error { - if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock, streamFilters.EndingBlock, payload.BlockNumber.Int64()) { +func (s *Screener) filterStorage(streamFilters config.Subscription, response *ResponsePayload, payload IPLDPayload) error { + if !streamFilters.StorageFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { + response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) stateKeyFilters := make([]common.Hash, 0, len(streamFilters.StorageFilter.Addresses)) for _, addr := range streamFilters.StorageFilter.Addresses { keyFilter := AddressToKey(common.HexToAddress(addr)) diff --git a/pkg/ipfs/service.go b/pkg/ipfs/service.go index 4f0674f2..e16f4bb2 100644 --- a/pkg/ipfs/service.go +++ b/pkg/ipfs/service.go @@ -20,22 +20,24 @@ import ( "fmt" "sync" - "github.com/vulcanize/vulcanizedb/pkg/config" - + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" log "github.com/sirupsen/logrus" + "github.com/vulcanize/vulcanizedb/pkg/config" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) const payloadChanBufferSize = 20000 // the max eth sub buffer size -// SyncPublishScreenAndServe is an interface for streaming, converting to IPLDs, publishing, -// indexing all Ethereum data screening this data, and serving it up to subscribed clients +// SyncPublishScreenAndServe is the top level interface for streaming, converting to IPLDs, publishing, +// and indexing all Ethereum data; screening this data; and serving it up to subscribed clients // This service is compatible with the Ethereum service interface (node.Service) type SyncPublishScreenAndServe interface { // APIs(), Protocols(), Start() and Stop() @@ -45,9 +47,9 @@ type SyncPublishScreenAndServe interface { // Main event loop for handling client pub-sub ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan IPLDPayload, receiveQuitchan <-chan bool) // Method to subscribe to receive state diff processing output - Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *config.Subscription) + Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters config.Subscription) // Method to unsubscribe from state diff processing - Unsubscribe(id rpc.ID) error + Unsubscribe(id rpc.ID) } // Service is the underlying struct for the SyncAndPublish interface @@ -74,8 +76,10 @@ type Service struct { PayloadChan chan statediff.Payload // Used to signal shutdown of the service QuitChan chan bool - // A mapping of rpc.IDs to their subscription channels - Subscriptions map[rpc.ID]Subscription + // A mapping of rpc.IDs to their subscription channels, mapped to their subscription type (hash of the StreamFilters) + Subscriptions map[common.Hash]map[rpc.ID]Subscription + // A mapping of subscription hash type to the corresponding StreamFilters + SubscriptionTypes map[common.Hash]config.Subscription } // NewIPFSProcessor creates a new Processor interface using an underlying Processor struct @@ -89,17 +93,18 @@ func NewIPFSProcessor(ipfsPath string, db *postgres.DB, ethClient core.EthClient return nil, err } return &Service{ - Streamer: NewStateDiffStreamer(rpcClient), - Repository: NewCIDRepository(db), - Converter: NewPayloadConverter(ethClient), - Publisher: publisher, - Screener: NewResponseScreener(), - Fetcher: fetcher, - Retriever: NewCIDRetriever(db), - Resolver: NewIPLDResolver(), - PayloadChan: make(chan statediff.Payload, payloadChanBufferSize), - QuitChan: qc, - Subscriptions: make(map[rpc.ID]Subscription), + Streamer: NewStateDiffStreamer(rpcClient), + Repository: NewCIDRepository(db), + Converter: NewPayloadConverter(ethClient), + Publisher: publisher, + Screener: NewResponseScreener(), + Fetcher: fetcher, + Retriever: NewCIDRetriever(db), + Resolver: NewIPLDResolver(), + PayloadChan: make(chan statediff.Payload, payloadChanBufferSize), + QuitChan: qc, + Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), + SubscriptionTypes: make(map[common.Hash]config.Subscription), }, nil } @@ -196,40 +201,60 @@ func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, receivePayloadChan <-chan } func (sap *Service) processResponse(payload IPLDPayload) error { - for id, sub := range sap.Subscriptions { - response, err := sap.Screener.ScreenResponse(sub.StreamFilters, payload) + for ty, subs := range sap.Subscriptions { + // Retreive the subscription paramaters for this subscription type + subConfig, ok := sap.SubscriptionTypes[ty] + if !ok { + return fmt.Errorf("subscription configuration for subscription type %s not available", ty.Hex()) + } + response, err := sap.Screener.ScreenResponse(subConfig, payload) if err != nil { return err } - sap.serve(id, *response) + for id := range subs { + //TODO send payloads to this type of sub + sap.serve(id, *response, ty) + + } } return nil } // Subscribe is used by the API to subscribe to the service loop -func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters *config.Subscription) { +func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan chan<- bool, streamFilters config.Subscription) { log.Info("Subscribing to the seed node service") + // Subscription type is defined as the hash of its content + // Group subscriptions by type and screen payloads once for subs of the same type + by, err := rlp.EncodeToBytes(streamFilters) + if err != nil { + log.Error(err) + } + subscriptionHash := crypto.Keccak256(by) + subscriptionType := common.BytesToHash(subscriptionHash) subscription := Subscription{ - PayloadChan: sub, - QuitChan: quitChan, - StreamFilters: streamFilters, + PayloadChan: sub, + QuitChan: quitChan, } // If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data // Otherwise we only filter new data as it is streamed in from the state diffing geth node if streamFilters.BackFill || streamFilters.BackFillOnly { - sap.backFill(subscription, id) + sap.backFill(subscription, id, streamFilters) } if !streamFilters.BackFillOnly { sap.Lock() - sap.Subscriptions[id] = subscription + if sap.Subscriptions[subscriptionType] == nil { + sap.Subscriptions[subscriptionType] = make(map[rpc.ID]Subscription) + } + sap.Subscriptions[subscriptionType][id] = subscription + sap.SubscriptionTypes[subscriptionType] = streamFilters sap.Unlock() } } -func (sap *Service) backFill(sub Subscription, id rpc.ID) { +func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscription) { log.Debug("back-filling data for id", id) // Retrieve cached CIDs relevant to this subscriber - cidWrappers, err := sap.Retriever.RetrieveCIDs(*sub.StreamFilters) + cidWrappers, err := sap.Retriever.RetrieveCIDs(con) if err != nil { sub.PayloadChan <- ResponsePayload{ ErrMsg: "CID retrieval error: " + err.Error(), @@ -260,16 +285,18 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID) { } // Unsubscribe is used to unsubscribe to the StateDiffingService loop -func (sap *Service) Unsubscribe(id rpc.ID) error { +func (sap *Service) Unsubscribe(id rpc.ID) { log.Info("Unsubscribing from the seed node service") sap.Lock() - _, ok := sap.Subscriptions[id] - if !ok { - return fmt.Errorf("cannot unsubscribe; subscription for id %s does not exist", id) + for ty := range sap.Subscriptions { + delete(sap.Subscriptions[ty], id) + if len(sap.Subscriptions[ty]) == 0 { + // If we removed the last subscription of this type, remove the subscription type outright + delete(sap.Subscriptions, ty) + delete(sap.SubscriptionTypes, ty) + } } - delete(sap.Subscriptions, id) sap.Unlock() - return nil } // Start is used to begin the service @@ -293,9 +320,9 @@ func (sap *Service) Stop() error { } // serve is used to send screened payloads to their requesting sub -func (sap *Service) serve(id rpc.ID, payload ResponsePayload) { +func (sap *Service) serve(id rpc.ID, payload ResponsePayload, ty common.Hash) { sap.Lock() - sub, ok := sap.Subscriptions[id] + sub, ok := sap.Subscriptions[ty][id] if ok { select { case sub.PayloadChan <- payload: @@ -310,14 +337,17 @@ func (sap *Service) serve(id rpc.ID, payload ResponsePayload) { // close is used to close all listening subscriptions func (sap *Service) close() { sap.Lock() - for id, sub := range sap.Subscriptions { - select { - case sub.QuitChan <- true: - log.Infof("closing subscription %s", id) - default: - log.Infof("unable to close subscription %s; channel has no receiver", id) + for ty, subs := range sap.Subscriptions { + for id, sub := range subs { + select { + case sub.QuitChan <- true: + log.Infof("closing subscription %s", id) + default: + log.Infof("unable to close subscription %s; channel has no receiver", id) + } } - delete(sap.Subscriptions, id) + delete(sap.Subscriptions, ty) + delete(sap.SubscriptionTypes, ty) } sap.Unlock() } diff --git a/pkg/ipfs/types.go b/pkg/ipfs/types.go index 0a33cf51..522241b7 100644 --- a/pkg/ipfs/types.go +++ b/pkg/ipfs/types.go @@ -20,23 +20,20 @@ import ( "encoding/json" "math/big" - "github.com/vulcanize/vulcanizedb/pkg/config" - - "github.com/ipfs/go-block-format" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ipfs/go-block-format" ) // Subscription holds the information for an individual client subscription type Subscription struct { - PayloadChan chan<- ResponsePayload - QuitChan chan<- bool - StreamFilters *config.Subscription + PayloadChan chan<- ResponsePayload + QuitChan chan<- bool } // ResponsePayload holds the data returned from the seed node to the requesting client type ResponsePayload struct { + BlockNumber *big.Int `json:"blockNumber"` HeadersRlp [][]byte `json:"headersRlp"` UnclesRlp [][]byte `json:"unclesRlp"` TransactionsRlp [][]byte `json:"transactionsRlp"` @@ -69,8 +66,9 @@ func (sd *ResponsePayload) Encode() ([]byte, error) { // CidWrapper is used to package CIDs retrieved from the local Postgres cache type CidWrapper struct { - BlockNumber int64 + BlockNumber *big.Int Headers []string + Uncles []string Transactions []string Receipts []string StateNodes []StateNodeCID @@ -79,7 +77,9 @@ type CidWrapper struct { // IpldWrapper is used to package raw IPLD block data for resolution type IpldWrapper struct { + BlockNumber *big.Int Headers []blocks.Block + Uncles []blocks.Block Transactions []blocks.Block Receipts []blocks.Block StateNodes map[common.Hash]blocks.Block