diff --git a/cmd/streamEthSubscribe.go b/cmd/streamEthSubscribe.go index f27b5b88..24c79501 100644 --- a/cmd/streamEthSubscribe.go +++ b/cmd/streamEthSubscribe.go @@ -65,7 +65,7 @@ func streamEthSubscription() { str := streamer.NewSuperNodeStreamer(rpcClient) // Buffered channel for reading subscription payloads - payloadChan := make(chan super_node.Payload, 20000) + payloadChan := make(chan super_node.SubscriptionPayload, 20000) // Subscribe to the super node service with the given config/filter parameters sub, err := str.Stream(payloadChan, ethSubConfig) @@ -81,9 +81,9 @@ func streamEthSubscription() { logWithCommand.Error(payload.Err) continue } - data, ok := payload.Data.(eth.StreamPayload) + data, ok := payload.Data.(eth.StreamResponse) if !ok { - logWithCommand.Warnf("payload data expected type %T got %T", eth.StreamPayload{}, payload.Data) + logWithCommand.Warnf("payload data expected type %T got %T", eth.StreamResponse{}, payload.Data) continue } for _, headerRlp := range data.HeadersRlp { diff --git a/cmd/superNode.go b/cmd/superNode.go index 1e777dc8..d2088583 100644 --- a/cmd/superNode.go +++ b/cmd/superNode.go @@ -18,6 +18,8 @@ package cmd import ( "sync" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -59,10 +61,10 @@ func superNode() { } wg := &sync.WaitGroup{} var forwardQuitChan chan bool - var forwardPayloadChan chan interface{} + var forwardPayloadChan chan shared.StreamedIPLDs if superNodeConfig.Serve { forwardQuitChan = make(chan bool) - forwardPayloadChan = make(chan interface{}, super_node.PayloadChanBufferSize) + forwardPayloadChan = make(chan shared.StreamedIPLDs, super_node.PayloadChanBufferSize) superNode.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan) if err := startServers(superNode, superNodeConfig); err != nil { logWithCommand.Fatal(err) diff --git a/libraries/shared/streamer/super_node_streamer.go b/libraries/shared/streamer/super_node_streamer.go index df9708c5..a6bbf7b8 100644 --- a/libraries/shared/streamer/super_node_streamer.go +++ b/libraries/shared/streamer/super_node_streamer.go @@ -19,14 +19,15 @@ package streamer import ( "github.com/ethereum/go-ethereum/rpc" - "github.com/vulcanize/vulcanizedb/pkg/super_node" "github.com/vulcanize/vulcanizedb/pkg/eth/core" + "github.com/vulcanize/vulcanizedb/pkg/super_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // ISuperNodeStreamer is the interface for streaming SuperNodePayloads from a vulcanizeDB super node type ISuperNodeStreamer interface { - Stream(payloadChan chan super_node.Payload, params super_node.SubscriptionSettings) (*rpc.ClientSubscription, error) + Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error) } // SuperNodeStreamer is the underlying struct for the ISuperNodeStreamer interface @@ -42,6 +43,6 @@ func NewSuperNodeStreamer(client core.RPCClient) *SuperNodeStreamer { } // Stream is the main loop for subscribing to data from a vulcanizedb super node -func (sds *SuperNodeStreamer) Stream(payloadChan chan super_node.Payload, params super_node.SubscriptionSettings) (*rpc.ClientSubscription, error) { +func (sds *SuperNodeStreamer) Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error) { return sds.Client.Subscribe("vdb", payloadChan, "stream", params) } diff --git a/libraries/shared/transformer/super_node_transformer.go b/libraries/shared/transformer/super_node_transformer.go index fa5eac55..a6412947 100644 --- a/libraries/shared/transformer/super_node_transformer.go +++ b/libraries/shared/transformer/super_node_transformer.go @@ -19,13 +19,13 @@ package transformer import ( "github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/super_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) type SuperNodeTransformer interface { Init() error Execute() error - GetConfig() super_node.SubscriptionSettings + GetConfig() shared.SubscriptionSettings } -type SuperNodeTransformerInitializer func(db *postgres.DB, subCon super_node.SubscriptionSettings, client core.RPCClient) SuperNodeTransformer +type SuperNodeTransformerInitializer func(db *postgres.DB, subCon shared.SubscriptionSettings, client core.RPCClient) SuperNodeTransformer diff --git a/pkg/super_node/api.go b/pkg/super_node/api.go index ec9ae72e..32dcf2c2 100644 --- a/pkg/super_node/api.go +++ b/pkg/super_node/api.go @@ -19,6 +19,8 @@ package super_node import ( "context" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" @@ -44,7 +46,7 @@ func NewPublicSuperNodeAPI(superNodeInterface SuperNode) *PublicSuperNodeAPI { } // Stream is the public method to setup a subscription that fires off super node payloads as they are processed -func (api *PublicSuperNodeAPI) Stream(ctx context.Context, params SubscriptionSettings) (*rpc.Subscription, error) { +func (api *PublicSuperNodeAPI) Stream(ctx context.Context, params shared.SubscriptionSettings) (*rpc.Subscription, error) { // ensure that the RPC connection supports subscriptions notifier, supported := rpc.NotifierFromContext(ctx) if !supported { @@ -56,7 +58,7 @@ func (api *PublicSuperNodeAPI) Stream(ctx context.Context, params SubscriptionSe go func() { // subscribe to events from the SyncPublishScreenAndServe service - payloadChannel := make(chan Payload, PayloadChanBufferSize) + payloadChannel := make(chan SubscriptionPayload, PayloadChanBufferSize) quitChan := make(chan bool, 1) go api.sn.Subscribe(rpcSub.ID, payloadChannel, quitChan, params) diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index fa31b74e..a9ad071b 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -22,14 +22,11 @@ import ( "sync/atomic" "time" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" - - "github.com/ethereum/go-ethereum/params" log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/pkg/super_node/config" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) const ( @@ -71,7 +68,7 @@ func NewBackFillService(settings *config.SuperNode) (BackFillInterface, error) { if err != nil { return nil, err } - converter, err := NewPayloadConverter(settings.Chain, params.MainnetChainConfig) + converter, err := NewPayloadConverter(settings.Chain) if err != nil { return nil, err } diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 595cb9ff..c4d539d1 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -20,7 +20,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/statediff" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -41,7 +40,7 @@ var _ = Describe("BackFiller", func() { ReturnErr: nil, } mockConverter := &mocks.IterativePayloadConverter{ - ReturnIPLDPayload: []*eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload}, + ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload}, ReturnErr: nil, } mockRetriever := &mocks.MockCIDRetriever{ @@ -53,7 +52,7 @@ var _ = Describe("BackFiller", func() { }, } mockFetcher := &mocks.StateDiffFetcher{ - PayloadsToReturn: map[uint64]statediff.Payload{ + PayloadsToReturn: map[uint64]shared.RawChainData{ 100: mocks.MockStateDiffPayload, 101: mocks.MockStateDiffPayload, }, @@ -95,7 +94,7 @@ var _ = Describe("BackFiller", func() { ReturnErr: nil, } mockConverter := &mocks.IterativePayloadConverter{ - ReturnIPLDPayload: []*eth.IPLDPayload{mocks.MockIPLDPayload}, + ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload}, ReturnErr: nil, } mockRetriever := &mocks.MockCIDRetriever{ @@ -107,7 +106,7 @@ var _ = Describe("BackFiller", func() { }, } mockFetcher := &mocks.StateDiffFetcher{ - PayloadsToReturn: map[uint64]statediff.Payload{ + PayloadsToReturn: map[uint64]shared.RawChainData{ 100: mocks.MockStateDiffPayload, }, } @@ -145,7 +144,7 @@ var _ = Describe("BackFiller", func() { ReturnErr: nil, } mockConverter := &mocks.IterativePayloadConverter{ - ReturnIPLDPayload: []*eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload}, + ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload}, ReturnErr: nil, } mockRetriever := &mocks.MockCIDRetriever{ @@ -153,7 +152,7 @@ var _ = Describe("BackFiller", func() { GapsToRetrieve: []shared.Gap{}, } mockFetcher := &mocks.StateDiffFetcher{ - PayloadsToReturn: map[uint64]statediff.Payload{ + PayloadsToReturn: map[uint64]shared.RawChainData{ 1: mocks.MockStateDiffPayload, 2: mocks.MockStateDiffPayload, }, diff --git a/pkg/super_node/btc/converter.go b/pkg/super_node/btc/converter.go index 28616de6..79d7f762 100644 --- a/pkg/super_node/btc/converter.go +++ b/pkg/super_node/btc/converter.go @@ -18,6 +18,8 @@ package btc import ( "fmt" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // PayloadConverter satisfies the PayloadConverter interface for bitcoin @@ -30,7 +32,7 @@ func NewPayloadConverter() *PayloadConverter { // Convert method is used to convert a bitcoin BlockPayload to an IPLDPayload // Satisfies the shared.PayloadConverter interface -func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) { +func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { btcBlockPayload, ok := payload.(BlockPayload) if !ok { return nil, fmt.Errorf("btc converter: expected payload type %T got %T", BlockPayload{}, payload) diff --git a/pkg/super_node/btc/publisher.go b/pkg/super_node/btc/publisher.go index 1d2a2062..72d360f2 100644 --- a/pkg/super_node/btc/publisher.go +++ b/pkg/super_node/btc/publisher.go @@ -48,8 +48,8 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) { } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisher) Publish(payload interface{}) (interface{}, error) { - ipldPayload, ok := payload.(*IPLDPayload) +func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { + ipldPayload, ok := payload.(IPLDPayload) if !ok { return nil, fmt.Errorf("eth publisher expected payload type %T got %T", &IPLDPayload{}, payload) } diff --git a/pkg/super_node/btc/streamer.go b/pkg/super_node/btc/streamer.go index cd3960f5..dbaf0da6 100644 --- a/pkg/super_node/btc/streamer.go +++ b/pkg/super_node/btc/streamer.go @@ -43,7 +43,7 @@ func NewPayloadStreamer(clientConfig *rpcclient.ConnConfig) *PayloadStreamer { // Stream is the main loop for subscribing to data from the btc block notifications // Satisfies the shared.PayloadStreamer interface -func (ps *PayloadStreamer) Stream(payloadChan chan interface{}) (shared.ClientSubscription, error) { +func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) { logrus.Info("streaming block payloads from btc") blockNotificationHandler := rpcclient.NotificationHandlers{ // Notification handler for block connections, forwards new block data to the payloadChan diff --git a/pkg/super_node/btc/types.go b/pkg/super_node/btc/types.go index 41714a94..dbec9fbc 100644 --- a/pkg/super_node/btc/types.go +++ b/pkg/super_node/btc/types.go @@ -20,6 +20,8 @@ import ( "encoding/json" "math/big" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/ipfs/go-block-format" @@ -40,6 +42,10 @@ type IPLDPayload struct { TxMetaData []TxModel } +func (ip IPLDPayload) Value() shared.StreamedIPLDs { + return ip +} + // CIDPayload is a struct to hold all the CIDs and their associated meta data for indexing in Postgres // Returned by IPLDPublisher // Passed to CIDIndexer diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index 09473c13..7eec8f27 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -64,7 +64,7 @@ func NewCIDRetriever(chain config.ChainType, db *postgres.DB) (shared.CIDRetriev } // NewPayloadStreamer constructs a PayloadStreamer for the provided chain type -func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.PayloadStreamer, chan interface{}, error) { +func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error) { switch chain { case config.Ethereum: ethClient, ok := client.(core.RPCClient) @@ -72,14 +72,14 @@ func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.Payl var expectedClientType core.RPCClient return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", expectedClientType, client) } - streamChan := make(chan interface{}, eth.PayloadChanBufferSize) + streamChan := make(chan shared.RawChainData, eth.PayloadChanBufferSize) return eth.NewPayloadStreamer(ethClient), streamChan, nil case config.Bitcoin: btcClientConn, ok := client.(*rpcclient.ConnConfig) if !ok { return nil, nil, fmt.Errorf("bitcoin payload streamer constructor expected client config type %T got %T", rpcclient.ConnConfig{}, client) } - streamChan := make(chan interface{}, btc.PayloadChanBufferSize) + streamChan := make(chan shared.RawChainData, btc.PayloadChanBufferSize) return btc.NewPayloadStreamer(btcClientConn), streamChan, nil default: return nil, nil, fmt.Errorf("invalid chain %T for streamer constructor", chain) @@ -102,14 +102,10 @@ func NewPaylaodFetcher(chain config.ChainType, client interface{}) (shared.Paylo } // NewPayloadConverter constructs a PayloadConverter for the provided chain type -func NewPayloadConverter(chain config.ChainType, settings interface{}) (shared.PayloadConverter, error) { +func NewPayloadConverter(chain config.ChainType) (shared.PayloadConverter, error) { switch chain { case config.Ethereum: - ethConfig, ok := settings.(*params.ChainConfig) - if !ok { - return nil, fmt.Errorf("ethereum converter constructor expected config type %T got %T", ¶ms.ChainConfig{}, settings) - } - return eth.NewPayloadConverter(ethConfig), nil + return eth.NewPayloadConverter(params.MainnetChainConfig), nil case config.Bitcoin: return btc.NewPayloadConverter(), nil default: diff --git a/pkg/super_node/eth/converter.go b/pkg/super_node/eth/converter.go index a01dec9a..a6c614c6 100644 --- a/pkg/super_node/eth/converter.go +++ b/pkg/super_node/eth/converter.go @@ -19,6 +19,8 @@ package eth import ( "fmt" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" @@ -40,7 +42,7 @@ func NewPayloadConverter(chainConfig *params.ChainConfig) *PayloadConverter { // Convert method is used to convert a eth statediff.Payload to an IPLDPayload // Satisfies the shared.PayloadConverter interface -func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) { +func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { stateDiffPayload, ok := payload.(statediff.Payload) if !ok { return nil, fmt.Errorf("eth converter: expected payload type %T got %T", statediff.Payload{}, payload) @@ -51,7 +53,7 @@ func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) { return nil, err } trxLen := len(block.Transactions()) - convertedPayload := &IPLDPayload{ + convertedPayload := IPLDPayload{ TotalDifficulty: stateDiffPayload.TotalDifficulty, Block: block, TxMetaData: make([]TxModel, 0, trxLen), diff --git a/pkg/super_node/eth/converter_test.go b/pkg/super_node/eth/converter_test.go index bbff68cb..a75b4586 100644 --- a/pkg/super_node/eth/converter_test.go +++ b/pkg/super_node/eth/converter_test.go @@ -32,7 +32,7 @@ var _ = Describe("Converter", func() { converter := eth.NewPayloadConverter(params.MainnetChainConfig) payload, err := converter.Convert(mocks.MockStateDiffPayload) Expect(err).ToNot(HaveOccurred()) - convertedPayload, ok := payload.(*eth.IPLDPayload) + convertedPayload, ok := payload.(eth.IPLDPayload) Expect(ok).To(BeTrue()) Expect(convertedPayload.Block.Number().String()).To(Equal(mocks.BlockNumber.String())) Expect(convertedPayload.Block.Hash().String()).To(Equal(mocks.MockBlock.Hash().String())) diff --git a/pkg/super_node/eth/filterer.go b/pkg/super_node/eth/filterer.go index 4e660e25..33c9f918 100644 --- a/pkg/super_node/eth/filterer.go +++ b/pkg/super_node/eth/filterer.go @@ -20,6 +20,8 @@ import ( "bytes" "fmt" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" @@ -37,44 +39,44 @@ func NewResponseFilterer() *ResponseFilterer { } // Filter is used to filter through eth data to extract and package requested data into a Payload -func (s *ResponseFilterer) Filter(filter, payload interface{}) (interface{}, error) { +func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.StreamedIPLDs) (shared.ServerResponse, error) { ethFilters, ok := filter.(*config.EthSubscription) if !ok { - return StreamPayload{}, fmt.Errorf("eth filterer expected filter type %T got %T", &config.EthSubscription{}, filter) + return StreamResponse{}, fmt.Errorf("eth filterer expected filter type %T got %T", &config.EthSubscription{}, filter) } - ethPayload, ok := payload.(*IPLDPayload) + ethPayload, ok := payload.(IPLDPayload) if !ok { - return StreamPayload{}, fmt.Errorf("eth filterer expected payload type %T got %T", &IPLDPayload{}, payload) + return StreamResponse{}, fmt.Errorf("eth filterer expected payload type %T got %T", IPLDPayload{}, payload) } if checkRange(ethFilters.Start.Int64(), ethFilters.End.Int64(), ethPayload.Block.Number().Int64()) { - response := new(StreamPayload) + response := new(StreamResponse) if err := s.filterHeaders(ethFilters.HeaderFilter, response, ethPayload); err != nil { - return StreamPayload{}, err + return StreamResponse{}, err } txHashes, err := s.filterTransactions(ethFilters.TxFilter, response, ethPayload) if err != nil { - return StreamPayload{}, err + return StreamResponse{}, err } var filterTxs []common.Hash if ethFilters.ReceiptFilter.MatchTxs { filterTxs = txHashes } if err := s.filerReceipts(ethFilters.ReceiptFilter, response, ethPayload, filterTxs); err != nil { - return StreamPayload{}, err + return StreamResponse{}, err } if err := s.filterState(ethFilters.StateFilter, response, ethPayload); err != nil { - return StreamPayload{}, err + return StreamResponse{}, err } if err := s.filterStorage(ethFilters.StorageFilter, response, ethPayload); err != nil { - return StreamPayload{}, err + return StreamResponse{}, err } response.BlockNumber = ethPayload.Block.Number() return *response, nil } - return StreamPayload{}, nil + return StreamResponse{}, nil } -func (s *ResponseFilterer) filterHeaders(headerFilter config.HeaderFilter, response *StreamPayload, payload *IPLDPayload) error { +func (s *ResponseFilterer) filterHeaders(headerFilter config.HeaderFilter, response *StreamResponse, payload IPLDPayload) error { if !headerFilter.Off { headerRLP, err := rlp.EncodeToBytes(payload.Block.Header()) if err != nil { @@ -102,7 +104,7 @@ func checkRange(start, end, actual int64) bool { return false } -func (s *ResponseFilterer) filterTransactions(trxFilter config.TxFilter, response *StreamPayload, payload *IPLDPayload) ([]common.Hash, error) { +func (s *ResponseFilterer) filterTransactions(trxFilter config.TxFilter, response *StreamResponse, payload IPLDPayload) ([]common.Hash, error) { trxHashes := make([]common.Hash, 0, len(payload.Block.Body().Transactions)) if !trxFilter.Off { for i, trx := range payload.Block.Body().Transactions { @@ -137,7 +139,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin return false } -func (s *ResponseFilterer) filerReceipts(receiptFilter config.ReceiptFilter, response *StreamPayload, payload *IPLDPayload, trxHashes []common.Hash) error { +func (s *ResponseFilterer) filerReceipts(receiptFilter config.ReceiptFilter, response *StreamResponse, payload IPLDPayload, trxHashes []common.Hash) error { if !receiptFilter.Off { for i, receipt := range payload.Receipts { // topics is always length 4 @@ -221,7 +223,7 @@ func slicesShareString(slice1, slice2 []string) int { return 0 } -func (s *ResponseFilterer) filterState(stateFilter config.StateFilter, response *StreamPayload, payload *IPLDPayload) error { +func (s *ResponseFilterer) filterState(stateFilter config.StateFilter, response *StreamResponse, payload IPLDPayload) error { if !stateFilter.Off { response.StateNodesRlp = make(map[common.Hash][]byte) keyFilters := make([]common.Hash, len(stateFilter.Addresses)) @@ -252,7 +254,7 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool { return false } -func (s *ResponseFilterer) filterStorage(storageFilter config.StorageFilter, response *StreamPayload, payload *IPLDPayload) error { +func (s *ResponseFilterer) filterStorage(storageFilter config.StorageFilter, response *StreamResponse, payload IPLDPayload) error { if !storageFilter.Off { response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) stateKeyFilters := make([]common.Hash, len(storageFilter.Addresses)) diff --git a/pkg/super_node/eth/filterer_test.go b/pkg/super_node/eth/filterer_test.go index 73868627..745f1c0a 100644 --- a/pkg/super_node/eth/filterer_test.go +++ b/pkg/super_node/eth/filterer_test.go @@ -45,7 +45,7 @@ var _ = Describe("Filterer", func() { It("Transcribes all the data from the IPLDPayload into the StreamPayload if given an open filter", func() { payload, err := filterer.Filter(openFilter, mocks.MockIPLDPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload, ok := payload.(eth.StreamPayload) + superNodePayload, ok := payload.(eth.StreamResponse) Expect(ok).To(BeTrue()) Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeedNodePayload.HeadersRlp)) @@ -66,7 +66,7 @@ var _ = Describe("Filterer", func() { It("Applies filters from the provided config.Subscription", func() { payload1, err := filterer.Filter(rctContractFilter, mocks.MockIPLDPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload1, ok := payload1.(eth.StreamPayload) + superNodePayload1, ok := payload1.(eth.StreamResponse) Expect(ok).To(BeTrue()) Expect(superNodePayload1.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(len(superNodePayload1.HeadersRlp)).To(Equal(0)) @@ -79,7 +79,7 @@ var _ = Describe("Filterer", func() { payload2, err := filterer.Filter(rctTopicsFilter, mocks.MockIPLDPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload2, ok := payload2.(eth.StreamPayload) + superNodePayload2, ok := payload2.(eth.StreamResponse) Expect(ok).To(BeTrue()) Expect(superNodePayload2.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(len(superNodePayload2.HeadersRlp)).To(Equal(0)) @@ -92,7 +92,7 @@ var _ = Describe("Filterer", func() { payload3, err := filterer.Filter(rctTopicsAndContractFilter, mocks.MockIPLDPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload3, ok := payload3.(eth.StreamPayload) + superNodePayload3, ok := payload3.(eth.StreamResponse) Expect(ok).To(BeTrue()) Expect(superNodePayload3.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(len(superNodePayload3.HeadersRlp)).To(Equal(0)) @@ -105,7 +105,7 @@ var _ = Describe("Filterer", func() { payload4, err := filterer.Filter(rctContractsAndTopicFilter, mocks.MockIPLDPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload4, ok := payload4.(eth.StreamPayload) + superNodePayload4, ok := payload4.(eth.StreamResponse) Expect(ok).To(BeTrue()) Expect(superNodePayload4.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(len(superNodePayload4.HeadersRlp)).To(Equal(0)) @@ -118,7 +118,7 @@ var _ = Describe("Filterer", func() { payload5, err := filterer.Filter(rctsForAllCollectedTrxs, mocks.MockIPLDPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload5, ok := payload5.(eth.StreamPayload) + superNodePayload5, ok := payload5.(eth.StreamResponse) Expect(ok).To(BeTrue()) Expect(superNodePayload5.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(len(superNodePayload5.HeadersRlp)).To(Equal(0)) @@ -134,7 +134,7 @@ var _ = Describe("Filterer", func() { payload6, err := filterer.Filter(rctsForSelectCollectedTrxs, mocks.MockIPLDPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload6, ok := payload6.(eth.StreamPayload) + superNodePayload6, ok := payload6.(eth.StreamResponse) Expect(ok).To(BeTrue()) Expect(superNodePayload6.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(len(superNodePayload6.HeadersRlp)).To(Equal(0)) @@ -148,7 +148,7 @@ var _ = Describe("Filterer", func() { payload7, err := filterer.Filter(stateFilter, mocks.MockIPLDPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload7, ok := payload7.(eth.StreamPayload) + superNodePayload7, ok := payload7.(eth.StreamResponse) Expect(ok).To(BeTrue()) Expect(superNodePayload7.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(len(superNodePayload7.HeadersRlp)).To(Equal(0)) @@ -161,7 +161,7 @@ var _ = Describe("Filterer", func() { payload8, err := filterer.Filter(rctTopicsAndContractFilterFail, mocks.MockIPLDPayload) Expect(err).ToNot(HaveOccurred()) - superNodePayload8, ok := payload8.(eth.StreamPayload) + superNodePayload8, ok := payload8.(eth.StreamResponse) Expect(ok).To(BeTrue()) Expect(superNodePayload8.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(len(superNodePayload8.HeadersRlp)).To(Equal(0)) diff --git a/pkg/super_node/eth/indexer.go b/pkg/super_node/eth/indexer.go index 9e93e2bc..faadef9e 100644 --- a/pkg/super_node/eth/indexer.go +++ b/pkg/super_node/eth/indexer.go @@ -19,6 +19,8 @@ package eth import ( "fmt" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/ethereum/go-ethereum/common" "github.com/jmoiron/sqlx" "github.com/lib/pq" @@ -40,7 +42,7 @@ func NewCIDIndexer(db *postgres.DB) *CIDIndexer { } // Index indexes a cidPayload in Postgres -func (in *CIDIndexer) Index(cids interface{}) error { +func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error { cidPayload, ok := cids.(*CIDPayload) if !ok { return fmt.Errorf("eth indexer expected cids type %T got %T", &CIDPayload{}, cids) diff --git a/pkg/super_node/eth/ipld_fetcher.go b/pkg/super_node/eth/ipld_fetcher.go index 8b3f5056..c9220f5e 100644 --- a/pkg/super_node/eth/ipld_fetcher.go +++ b/pkg/super_node/eth/ipld_fetcher.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/ethereum/go-ethereum/common" "github.com/ipfs/go-block-format" "github.com/ipfs/go-blockservice" @@ -51,7 +53,7 @@ func NewIPLDFetcher(ipfsPath string) (*IPLDFetcher, error) { } // Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper -func (f *IPLDFetcher) Fetch(cids interface{}) (interface{}, error) { +func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.FetchedIPLDs, error) { cidWrapper, ok := cids.(*CIDWrapper) if !ok { return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) diff --git a/pkg/super_node/eth/mocks/converter.go b/pkg/super_node/eth/mocks/converter.go index 65761f3f..428d0d0b 100644 --- a/pkg/super_node/eth/mocks/converter.go +++ b/pkg/super_node/eth/mocks/converter.go @@ -19,6 +19,8 @@ package mocks import ( "fmt" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/ethereum/go-ethereum/statediff" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" @@ -27,12 +29,12 @@ import ( // PayloadConverter is the underlying struct for the Converter interface type PayloadConverter struct { PassedStatediffPayload statediff.Payload - ReturnIPLDPayload *eth.IPLDPayload + ReturnIPLDPayload eth.IPLDPayload ReturnErr error } // Convert method is used to convert a geth statediff.Payload to a IPLDPayload -func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) { +func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { stateDiffPayload, ok := payload.(statediff.Payload) if !ok { return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload) @@ -44,13 +46,13 @@ func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) { // IterativePayloadConverter is the underlying struct for the Converter interface type IterativePayloadConverter struct { PassedStatediffPayload []statediff.Payload - ReturnIPLDPayload []*eth.IPLDPayload + ReturnIPLDPayload []eth.IPLDPayload ReturnErr error iteration int } // Convert method is used to convert a geth statediff.Payload to a IPLDPayload -func (pc *IterativePayloadConverter) Convert(payload interface{}) (interface{}, error) { +func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { stateDiffPayload, ok := payload.(statediff.Payload) if !ok { return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload) diff --git a/pkg/super_node/eth/mocks/fetcher.go b/pkg/super_node/eth/mocks/fetcher.go index 542ca0b8..dd9bc3cb 100644 --- a/pkg/super_node/eth/mocks/fetcher.go +++ b/pkg/super_node/eth/mocks/fetcher.go @@ -20,25 +20,25 @@ import ( "errors" "sync/atomic" - "github.com/ethereum/go-ethereum/statediff" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // StateDiffFetcher mock for tests type StateDiffFetcher struct { - PayloadsToReturn map[uint64]statediff.Payload + PayloadsToReturn map[uint64]shared.RawChainData FetchErrs map[uint64]error CalledAtBlockHeights [][]uint64 CalledTimes int64 } // FetchStateDiffsAt mock method -func (fetcher *StateDiffFetcher) FetchAt(blockHeights []uint64) ([]interface{}, error) { +func (fetcher *StateDiffFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { if fetcher.PayloadsToReturn == nil { return nil, errors.New("mock StateDiffFetcher needs to be initialized with payloads to return") } atomic.AddInt64(&fetcher.CalledTimes, 1) // thread-safe increment fetcher.CalledAtBlockHeights = append(fetcher.CalledAtBlockHeights, blockHeights) - results := make([]interface{}, 0, len(blockHeights)) + results := make([]shared.RawChainData, 0, len(blockHeights)) for _, height := range blockHeights { results = append(results, fetcher.PayloadsToReturn[height]) err, ok := fetcher.FetchErrs[height] diff --git a/pkg/super_node/eth/mocks/indexer.go b/pkg/super_node/eth/mocks/indexer.go index 05e6cd38..c05cac79 100644 --- a/pkg/super_node/eth/mocks/indexer.go +++ b/pkg/super_node/eth/mocks/indexer.go @@ -19,6 +19,8 @@ package mocks import ( "fmt" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" ) @@ -29,7 +31,7 @@ type CIDIndexer struct { } // Index indexes a cidPayload in Postgres -func (repo *CIDIndexer) Index(cids interface{}) error { +func (repo *CIDIndexer) Index(cids shared.CIDsForIndexing) error { cidPayload, ok := cids.(*eth.CIDPayload) if !ok { return fmt.Errorf("index expected cids type %T got %T", ð.CIDPayload{}, cids) diff --git a/pkg/super_node/eth/mocks/publisher.go b/pkg/super_node/eth/mocks/publisher.go index 6b85ff66..14887938 100644 --- a/pkg/super_node/eth/mocks/publisher.go +++ b/pkg/super_node/eth/mocks/publisher.go @@ -19,19 +19,21 @@ package mocks import ( "fmt" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" ) // IPLDPublisher is the underlying struct for the Publisher interface type IPLDPublisher struct { - PassedIPLDPayload *eth.IPLDPayload + PassedIPLDPayload eth.IPLDPayload ReturnCIDPayload *eth.CIDPayload ReturnErr error } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisher) Publish(payload interface{}) (interface{}, error) { - ipldPayload, ok := payload.(*eth.IPLDPayload) +func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { + ipldPayload, ok := payload.(eth.IPLDPayload) if !ok { return nil, fmt.Errorf("publish expected payload type %T got %T", ð.IPLDPayload{}, payload) } @@ -41,15 +43,15 @@ func (pub *IPLDPublisher) Publish(payload interface{}) (interface{}, error) { // IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing type IterativeIPLDPublisher struct { - PassedIPLDPayload []*eth.IPLDPayload + PassedIPLDPayload []eth.IPLDPayload ReturnCIDPayload []*eth.CIDPayload ReturnErr error iteration int } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IterativeIPLDPublisher) Publish(payload interface{}) (interface{}, error) { - ipldPayload, ok := payload.(*eth.IPLDPayload) +func (pub *IterativeIPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { + ipldPayload, ok := payload.(eth.IPLDPayload) if !ok { return nil, fmt.Errorf("publish expected payload type %T got %T", ð.IPLDPayload{}, payload) } diff --git a/pkg/super_node/eth/mocks/retriever.go b/pkg/super_node/eth/mocks/retriever.go index 0a678e6f..266fe108 100644 --- a/pkg/super_node/eth/mocks/retriever.go +++ b/pkg/super_node/eth/mocks/retriever.go @@ -31,7 +31,7 @@ type MockCIDRetriever struct { } // RetrieveCIDs mock method -func (*MockCIDRetriever) Retrieve(filter interface{}, blockNumber int64) (interface{}, bool, error) { +func (*MockCIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) { panic("implement me") } diff --git a/pkg/super_node/eth/mocks/streamer.go b/pkg/super_node/eth/mocks/streamer.go index cc8a6b97..a4e49aad 100644 --- a/pkg/super_node/eth/mocks/streamer.go +++ b/pkg/super_node/eth/mocks/streamer.go @@ -24,14 +24,14 @@ import ( // StateDiffStreamer is the underlying struct for the Streamer interface type StateDiffStreamer struct { - PassedPayloadChan chan interface{} + PassedPayloadChan chan shared.RawChainData ReturnSub *rpc.ClientSubscription ReturnErr error StreamPayloads []statediff.Payload } // Stream is the main loop for subscribing to data from the Geth state diff process -func (sds *StateDiffStreamer) Stream(payloadChan chan interface{}) (shared.ClientSubscription, error) { +func (sds *StateDiffStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) { sds.PassedPayloadChan = payloadChan go func() { diff --git a/pkg/super_node/eth/mocks/test_data.go b/pkg/super_node/eth/mocks/test_data.go index 361121a9..f9398cf2 100644 --- a/pkg/super_node/eth/mocks/test_data.go +++ b/pkg/super_node/eth/mocks/test_data.go @@ -233,7 +233,7 @@ var ( TotalDifficulty: big.NewInt(1337), } - MockIPLDPayload = ð.IPLDPayload{ + MockIPLDPayload = eth.IPLDPayload{ TotalDifficulty: big.NewInt(1337), Block: MockBlock, Receipts: MockReceipts, @@ -318,7 +318,7 @@ var ( }, } - MockSeedNodePayload = eth2.StreamPayload{ + MockSeedNodePayload = eth2.StreamResponse{ BlockNumber: big.NewInt(1), HeadersRlp: [][]byte{MockHeaderRlp}, UnclesRlp: [][]byte{}, diff --git a/pkg/super_node/eth/payload_fetcher.go b/pkg/super_node/eth/payload_fetcher.go index 2029fd07..ac12aa41 100644 --- a/pkg/super_node/eth/payload_fetcher.go +++ b/pkg/super_node/eth/payload_fetcher.go @@ -19,6 +19,8 @@ package eth import ( "fmt" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/ethereum/go-ethereum/statediff" "github.com/vulcanize/vulcanizedb/pkg/eth/client" @@ -47,7 +49,7 @@ func NewPayloadFetcher(bc BatchClient) *PayloadFetcher { // FetchAt fetches the statediff payloads at the given block heights // Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) -func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]interface{}, error) { +func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { batch := make([]client.BatchElem, 0) for _, height := range blockHeights { batch = append(batch, client.BatchElem{ @@ -60,7 +62,7 @@ func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]interface{}, er if batchErr != nil { return nil, fmt.Errorf("PayloadFetcher err: %s", batchErr.Error()) } - results := make([]interface{}, 0, len(blockHeights)) + results := make([]shared.RawChainData, 0, len(blockHeights)) for _, batchElem := range batch { if batchElem.Error != nil { return nil, fmt.Errorf("PayloadFetcher err: %s", batchElem.Error.Error()) diff --git a/pkg/super_node/eth/publisher.go b/pkg/super_node/eth/publisher.go index 9720514c..f584893d 100644 --- a/pkg/super_node/eth/publisher.go +++ b/pkg/super_node/eth/publisher.go @@ -53,10 +53,10 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) { } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisher) Publish(payload interface{}) (interface{}, error) { - ipldPayload, ok := payload.(*IPLDPayload) +func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { + ipldPayload, ok := payload.(IPLDPayload) if !ok { - return nil, fmt.Errorf("eth publisher expected payload type %T got %T", &IPLDPayload{}, payload) + return nil, fmt.Errorf("eth publisher expected payload type %T got %T", IPLDPayload{}, payload) } // Process and publish headers headerCid, err := pub.publishHeader(ipldPayload.Block.Header()) diff --git a/pkg/super_node/eth/resolver.go b/pkg/super_node/eth/resolver.go index d8783e04..801759ce 100644 --- a/pkg/super_node/eth/resolver.go +++ b/pkg/super_node/eth/resolver.go @@ -19,6 +19,8 @@ package eth import ( "fmt" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/ethereum/go-ethereum/common" "github.com/ipfs/go-block-format" ) @@ -32,12 +34,12 @@ func NewIPLDResolver() *IPLDResolver { } // Resolve is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper -func (eir *IPLDResolver) Resolve(iplds interface{}) (interface{}, error) { +func (eir *IPLDResolver) Resolve(iplds shared.FetchedIPLDs) (shared.ServerResponse, error) { ipfsBlocks, ok := iplds.(*IPLDWrapper) if !ok { - return StreamPayload{}, fmt.Errorf("eth resolver expected iplds type %T got %T", &IPLDWrapper{}, iplds) + return StreamResponse{}, fmt.Errorf("eth resolver expected iplds type %T got %T", &IPLDWrapper{}, iplds) } - return StreamPayload{ + return StreamResponse{ BlockNumber: ipfsBlocks.BlockNumber, HeadersRlp: eir.ResolveHeaders(ipfsBlocks.Headers), UnclesRlp: eir.ResolveUncles(ipfsBlocks.Uncles), diff --git a/pkg/super_node/eth/resolver_test.go b/pkg/super_node/eth/resolver_test.go index eae9d300..66115aca 100644 --- a/pkg/super_node/eth/resolver_test.go +++ b/pkg/super_node/eth/resolver_test.go @@ -37,7 +37,7 @@ var _ = Describe("Resolver", func() { It("Resolves IPLD data to their correct geth data types and packages them to send to requesting transformers", func() { payload, err := resolver.Resolve(mocks.MockIPLDWrapper) Expect(err).ToNot(HaveOccurred()) - superNodePayload, ok := payload.(eth.StreamPayload) + superNodePayload, ok := payload.(eth.StreamResponse) Expect(ok).To(BeTrue()) Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64())) Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeedNodePayload.HeadersRlp)) diff --git a/pkg/super_node/eth/retriever.go b/pkg/super_node/eth/retriever.go index 7f645d6c..3dff18af 100644 --- a/pkg/super_node/eth/retriever.go +++ b/pkg/super_node/eth/retriever.go @@ -58,7 +58,7 @@ func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) { } // Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters -func (ecr *CIDRetriever) Retrieve(filter interface{}, blockNumber int64) (interface{}, bool, error) { +func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) { streamFilter, ok := filter.(*config.EthSubscription) if !ok { return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &config.EthSubscription{}, filter) diff --git a/pkg/super_node/eth/streamer.go b/pkg/super_node/eth/streamer.go index e0ad3ff7..17a8f8e5 100644 --- a/pkg/super_node/eth/streamer.go +++ b/pkg/super_node/eth/streamer.go @@ -41,7 +41,7 @@ func NewPayloadStreamer(client core.RPCClient) *PayloadStreamer { // Stream is the main loop for subscribing to data from the Geth state diff process // Satisfies the shared.PayloadStreamer interface -func (ps *PayloadStreamer) Stream(payloadChan chan interface{}) (shared.ClientSubscription, error) { +func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) { logrus.Info("streaming diffs from geth") return ps.Client.Subscribe("statediff", payloadChan, "stream") } diff --git a/pkg/super_node/eth/streamer_test.go b/pkg/super_node/eth/streamer_test.go index cc202d5f..c139c392 100644 --- a/pkg/super_node/eth/streamer_test.go +++ b/pkg/super_node/eth/streamer_test.go @@ -17,6 +17,7 @@ package eth_test import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/eth/fakes" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" @@ -26,7 +27,7 @@ var _ = Describe("StateDiff Streamer", func() { It("subscribes to the geth statediff service", func() { client := &fakes.MockRPCClient{} streamer := eth.NewPayloadStreamer(client) - payloadChan := make(chan interface{}) + payloadChan := make(chan shared.RawChainData) _, err := streamer.Stream(payloadChan) Expect(err).NotTo(HaveOccurred()) client.AssertSubscribeCalledWith("statediff", payloadChan, []interface{}{"stream"}) diff --git a/pkg/super_node/eth/types.go b/pkg/super_node/eth/types.go index 011965fb..81729381 100644 --- a/pkg/super_node/eth/types.go +++ b/pkg/super_node/eth/types.go @@ -20,6 +20,8 @@ import ( "encoding/json" "math/big" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ipfs/go-block-format" @@ -38,6 +40,10 @@ type IPLDPayload struct { StorageNodes map[common.Hash][]TrieNode } +func (ip IPLDPayload) Value() shared.StreamedIPLDs { + return ip +} + // Trie struct used to flag node as leaf or not type TrieNode struct { Key common.Hash @@ -83,10 +89,10 @@ type IPLDWrapper struct { StorageNodes map[common.Hash]map[common.Hash]blocks.Block } -// StreamPayload holds the data streamed from the super node eth service to the requesting clients +// StreamResponse holds the data streamed from the super node eth service to the requesting clients // Returned by IPLDResolver and ResponseFilterer // Passed to client subscriptions -type StreamPayload struct { +type StreamResponse struct { BlockNumber *big.Int `json:"blockNumber"` HeadersRlp [][]byte `json:"headersRlp"` UnclesRlp [][]byte `json:"unclesRlp"` @@ -99,20 +105,24 @@ type StreamPayload struct { err error } -func (sd *StreamPayload) ensureEncoded() { +func (sr StreamResponse) Value() shared.ServerResponse { + return sr +} + +func (sd *StreamResponse) ensureEncoded() { if sd.encoded == nil && sd.err == nil { sd.encoded, sd.err = json.Marshal(sd) } } // Length to implement Encoder interface for StateDiff -func (sd *StreamPayload) Length() int { +func (sd *StreamResponse) Length() int { sd.ensureEncoded() return len(sd.encoded) } // Encode to implement Encoder interface for StateDiff -func (sd *StreamPayload) Encode() ([]byte, error) { +func (sd *StreamResponse) Encode() ([]byte, error) { sd.ensureEncoded() return sd.encoded, sd.err } diff --git a/pkg/super_node/helpers.go b/pkg/super_node/helpers.go index b175ca28..50f80afd 100644 --- a/pkg/super_node/helpers.go +++ b/pkg/super_node/helpers.go @@ -21,7 +21,7 @@ import log "github.com/sirupsen/logrus" func sendNonBlockingErr(sub Subscription, err error) { log.Error(err) select { - case sub.PayloadChan <- Payload{nil, err.Error()}: + case sub.PayloadChan <- SubscriptionPayload{nil, err.Error()}: default: log.Infof("unable to send error to subscription %s", sub.ID) } diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index ab1471d7..af0bdb42 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -24,7 +24,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" @@ -47,11 +46,11 @@ type SuperNode interface { // APIs(), Protocols(), Start() and Stop() node.Service // Main event loop for syncAndPublish processes - SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- interface{}, forwardQuitchan chan<- bool) error + SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.StreamedIPLDs, forwardQuitchan chan<- bool) error // Main event loop for handling client pub-sub - ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan interface{}, screenAndServeQuit <-chan bool) + ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs, screenAndServeQuit <-chan bool) // Method to subscribe to receive state diff processing output - Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params SubscriptionSettings) + Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) // Method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) // Method to access the node info for this service @@ -79,13 +78,13 @@ type Service struct { // Interface for resolving IPLDs to their data types Resolver shared.IPLDResolver // Chan the processor uses to subscribe to payloads from the Streamer - PayloadChan chan interface{} + PayloadChan chan shared.RawChainData // Used to signal shutdown of the service QuitChan chan bool // A mapping of rpc.IDs to their subscription channels, mapped to their subscription type (hash of the StreamFilters) Subscriptions map[common.Hash]map[rpc.ID]Subscription // A mapping of subscription params hash to the corresponding subscription params - SubscriptionTypes map[common.Hash]SubscriptionSettings + SubscriptionTypes map[common.Hash]shared.SubscriptionSettings // Info for the Geth node that this super node is working with NodeInfo core.Node // Number of publishAndIndex workers @@ -111,7 +110,7 @@ func NewSuperNode(settings *config.SuperNode) (SuperNode, error) { if err != nil { return nil, err } - sn.Converter, err = NewPayloadConverter(settings.Chain, params.MainnetChainConfig) + sn.Converter, err = NewPayloadConverter(settings.Chain) if err != nil { return nil, err } @@ -145,7 +144,7 @@ func NewSuperNode(settings *config.SuperNode) (SuperNode, error) { } sn.QuitChan = settings.Quit sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) - sn.SubscriptionTypes = make(map[common.Hash]SubscriptionSettings) + sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings) sn.WorkerPoolSize = settings.Workers sn.NodeInfo = settings.NodeInfo sn.ipfsPath = settings.IPFSPath @@ -180,7 +179,7 @@ func (sap *Service) APIs() []rpc.API { // SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids // This continues on no matter if or how many subscribers there are, it then forwards the data to the ScreenAndServe() loop // which filters and sends relevant data to client subscriptions, if there are any -func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- interface{}, screenAndServeQuit chan<- bool) error { +func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- shared.StreamedIPLDs, screenAndServeQuit chan<- bool) error { sub, err := sap.Streamer.Stream(sap.PayloadChan) if err != nil { return err @@ -188,7 +187,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha wg.Add(1) // Channels for forwarding data to the publishAndIndex workers - publishAndIndexPayload := make(chan interface{}, PayloadChanBufferSize) + publishAndIndexPayload := make(chan shared.StreamedIPLDs, PayloadChanBufferSize) publishAndIndexQuit := make(chan bool, sap.WorkerPoolSize) // publishAndIndex worker pool to handle publishing and indexing concurrently, while // limiting the number of Postgres connections we can possibly open so as to prevent error @@ -204,14 +203,14 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha log.Error(err) continue } - // If we have a ScreenAndServe process running, forward the payload to it + // If we have a ScreenAndServe process running, forward the iplds to it select { - case screenAndServePayload <- ipldPayload: + case screenAndServePayload <- ipldPayload.Value(): default: } // Forward the payload to the publishAndIndex workers select { - case publishAndIndexPayload <- ipldPayload: + case publishAndIndexPayload <- ipldPayload.Value(): default: } case err := <-sub.Err(): @@ -239,7 +238,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha return nil } -func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan interface{}, publishAndIndexQuit <-chan bool) { +func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.StreamedIPLDs, publishAndIndexQuit <-chan bool) { go func() { for { select { @@ -263,7 +262,7 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan interf // ScreenAndServe is the loop used to screen data streamed from the state diffing eth node // and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration -func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan interface{}, screenAndServeQuit <-chan bool) { +func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs, screenAndServeQuit <-chan bool) { wg.Add(1) go func() { for { @@ -280,7 +279,7 @@ func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-c log.Info("screenAndServe goroutine successfully spun up") } -func (sap *Service) sendResponse(payload interface{}) { +func (sap *Service) sendResponse(payload shared.StreamedIPLDs) { sap.Lock() for ty, subs := range sap.Subscriptions { // Retrieve the subscription parameters for this subscription type @@ -298,7 +297,7 @@ func (sap *Service) sendResponse(payload interface{}) { } for id, sub := range subs { select { - case sub.PayloadChan <- Payload{response, ""}: + case sub.PayloadChan <- SubscriptionPayload{response.Value(), ""}: log.Infof("sending super node payload to subscription %s", id) default: log.Infof("unable to send payload to subscription %s; channel has no receiver", id) @@ -309,8 +308,8 @@ func (sap *Service) sendResponse(payload interface{}) { } // Subscribe is used by the API to subscribe to the service loop -// The params must be rlp serializable and satisfy the Params() interface -func (sap *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params SubscriptionSettings) { +// The params must be rlp serializable and satisfy the SubscriptionSettings() interface +func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) { log.Info("Subscribing to the super node service") subscription := Subscription{ ID: id, @@ -351,7 +350,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- boo } } -func (sap *Service) backFill(sub Subscription, id rpc.ID, params SubscriptionSettings) error { +func (sap *Service) backFill(sub Subscription, id rpc.ID, params shared.SubscriptionSettings) error { log.Debug("sending historical data for subscriber", id) // Retrieve cached CIDs relevant to this subscriber var endingBlock int64 @@ -394,7 +393,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, params SubscriptionSet continue } select { - case sub.PayloadChan <- Payload{backFillIplds, ""}: + case sub.PayloadChan <- SubscriptionPayload{backFillIplds.Value(), ""}: log.Infof("sending super node historical data payload to subscription %s", id) default: log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id) @@ -423,7 +422,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) { func (sap *Service) Start(*p2p.Server) error { log.Info("Starting super node service") wg := new(sync.WaitGroup) - payloadChan := make(chan interface{}, PayloadChanBufferSize) + payloadChan := make(chan shared.StreamedIPLDs, PayloadChanBufferSize) quitChan := make(chan bool, 1) if err := sap.SyncAndPublish(wg, payloadChan, quitChan); err != nil { return err diff --git a/pkg/super_node/service_test.go b/pkg/super_node/service_test.go index 54c11a6c..14c39001 100644 --- a/pkg/super_node/service_test.go +++ b/pkg/super_node/service_test.go @@ -20,6 +20,8 @@ import ( "sync" "time" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" . "github.com/onsi/ginkgo" @@ -33,7 +35,7 @@ var _ = Describe("Service", func() { Describe("SyncAndPublish", func() { It("Streams statediff.Payloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() { wg := new(sync.WaitGroup) - payloadChan := make(chan interface{}, 1) + payloadChan := make(chan shared.RawChainData, 1) quitChan := make(chan bool, 1) mockCidIndexer := &mocks2.CIDIndexer{ ReturnErr: nil, diff --git a/pkg/super_node/shared/functions.go b/pkg/super_node/shared/functions.go index 856e77d0..9b21409a 100644 --- a/pkg/super_node/shared/functions.go +++ b/pkg/super_node/shared/functions.go @@ -16,7 +16,10 @@ package shared -import "bytes" +import ( + "bytes" + "reflect" +) // ListContainsString used to check if a list of strings contains a particular string func ListContainsString(sss []string, s string) bool { @@ -47,3 +50,8 @@ func ListContainsGap(gapList []Gap, gap Gap) bool { } return false } + +// IsPointer returns true if the concrete type underneath the provided interface is a pointer +func IsPointer(i interface{}) bool { + return reflect.ValueOf(i).Type().Kind() == reflect.Ptr +} diff --git a/pkg/super_node/shared/intefaces.go b/pkg/super_node/shared/intefaces.go index c619bb83..80945c97 100644 --- a/pkg/super_node/shared/intefaces.go +++ b/pkg/super_node/shared/intefaces.go @@ -16,39 +16,45 @@ package shared +import ( + "math/big" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/config" +) + // PayloadStreamer streams chain-specific payloads to the provided channel type PayloadStreamer interface { - Stream(payloadChan chan interface{}) (ClientSubscription, error) + Stream(payloadChan chan RawChainData) (ClientSubscription, error) } // PayloadFetcher fetches chain-specific payloads type PayloadFetcher interface { - FetchAt(blockHeights []uint64) ([]interface{}, error) + FetchAt(blockHeights []uint64) ([]RawChainData, error) } // PayloadConverter converts chain-specific payloads into IPLD payloads for publishing type PayloadConverter interface { - Convert(payload interface{}) (interface{}, error) + Convert(payload RawChainData) (StreamedIPLDs, error) } // IPLDPublisher publishes IPLD payloads and returns a CID payload for indexing type IPLDPublisher interface { - Publish(payload interface{}) (interface{}, error) + Publish(payload StreamedIPLDs) (CIDsForIndexing, error) } // CIDIndexer indexes a CID payload in Postgres type CIDIndexer interface { - Index(cids interface{}) error + Index(cids CIDsForIndexing) error } // ResponseFilterer applies a filter to an IPLD payload to return a subscription response packet type ResponseFilterer interface { - Filter(filter, payload interface{}) (response interface{}, err error) + Filter(filter SubscriptionSettings, payload StreamedIPLDs) (response ServerResponse, err error) } // CIDRetriever retrieves cids according to a provided filter and returns a CID wrapper type CIDRetriever interface { - Retrieve(filter interface{}, blockNumber int64) (interface{}, bool, error) + Retrieve(filter SubscriptionSettings, blockNumber int64) (CIDsForFetching, bool, error) RetrieveFirstBlockNumber() (int64, error) RetrieveLastBlockNumber() (int64, error) RetrieveGapsInData() ([]Gap, error) @@ -56,12 +62,12 @@ type CIDRetriever interface { // IPLDFetcher uses a CID wrapper to fetch an IPLD wrapper type IPLDFetcher interface { - Fetch(cids interface{}) (interface{}, error) + Fetch(cids CIDsForFetching) (FetchedIPLDs, error) } // IPLDResolver resolves an IPLD wrapper into chain-specific payloads type IPLDResolver interface { - Resolve(iplds interface{}) (interface{}, error) + Resolve(iplds FetchedIPLDs) (ServerResponse, error) } // ClientSubscription is a general interface for chain data subscriptions @@ -74,3 +80,15 @@ type ClientSubscription interface { type DagPutter interface { DagPut(raw interface{}) ([]string, error) } + +// SubscriptionSettings is the interface every subscription filter type needs to satisfy, no matter the chain +// Further specifics of the underlying filter type depend on the internal needs of the types +// which satisfy the ResponseFilterer and CIDRetriever interfaces for a specific chain +// The underlying type needs to be rlp serializable +type SubscriptionSettings interface { + StartingBlock() *big.Int + EndingBlock() *big.Int + ChainType() config.ChainType + HistoricalData() bool + HistoricalDataOnly() bool +} diff --git a/pkg/super_node/shared/types.go b/pkg/super_node/shared/types.go index 9bfb973d..5045215e 100644 --- a/pkg/super_node/shared/types.go +++ b/pkg/super_node/shared/types.go @@ -16,6 +16,29 @@ package shared +// These types serve as very loose wrappers around a generic underlying interface{} +type RawChainData interface{} + +// The concrete type underneath StreamedIPLDs can be a pointer only if the Value() method returns a copy of the values +// stored at that memory location and not a copy of the pointer itself. +// We want to avoid sending a pointer to publishAndIndex and screenAndServe channels; sharing memory across these processes +type StreamedIPLDs interface { + Value() StreamedIPLDs +} + +type CIDsForIndexing interface{} + +type CIDsForFetching interface{} + +type FetchedIPLDs interface{} + +// The concrete type underneath StreamedIPLDs can be a pointer only if the Value() method returns a copy of the values +// stored at that memory location and not a copy of the pointer itself. +// We want to avoid sending a pointer to subscription channels; sharing memory across all subscriptions +type ServerResponse interface { + Value() ServerResponse +} + type Gap struct { Start uint64 Stop uint64 diff --git a/pkg/super_node/subscription.go b/pkg/super_node/subscription.go index 9669d194..e693eef7 100644 --- a/pkg/super_node/subscription.go +++ b/pkg/super_node/subscription.go @@ -17,35 +17,20 @@ package super_node import ( - "math/big" - "github.com/ethereum/go-ethereum/rpc" - - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // Subscription holds the information for an individual client subscription to the super node type Subscription struct { ID rpc.ID - PayloadChan chan<- Payload + PayloadChan chan<- SubscriptionPayload QuitChan chan<- bool } -// Payload is the struct for a super node stream payload +// SubscriptionPayload is the struct for a super node stream payload // It carries data of a type specific to the chain being supported/queried and an error message -type Payload struct { - Data interface{} `json:"data"` // e.g. for Ethereum eth.StreamPayload - Err string `json:"err"` -} - -// SubscriptionSettings is the interface every subscription filter type needs to satisfy, no matter the chain -// Further specifics of the underlying filter type depend on the internal needs of the types -// which satisfy the ResponseFilterer and CIDRetriever interfaces for a specific chain -// The underlying type needs to be rlp serializable -type SubscriptionSettings interface { - StartingBlock() *big.Int - EndingBlock() *big.Int - ChainType() config.ChainType - HistoricalData() bool - HistoricalDataOnly() bool +type SubscriptionPayload struct { + Data shared.ServerResponse `json:"data"` // e.g. for Ethereum eth.StreamPayload + Err string `json:"err"` }