diff --git a/cmd/streamEthSubscribe.go b/cmd/streamEthSubscribe.go index 24c79501..20051b97 100644 --- a/cmd/streamEthSubscribe.go +++ b/cmd/streamEthSubscribe.go @@ -32,7 +32,6 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/eth/client" "github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/super_node" - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" ) @@ -55,7 +54,7 @@ func init() { func streamEthSubscription() { // Prep the subscription config/filters to be sent to the server - ethSubConfig, err := config.NewEthSubscriptionConfig() + ethSubConfig, err := eth.NewEthSubscriptionConfig() if err != nil { log.Fatal(err) } diff --git a/cmd/superNode.go b/cmd/superNode.go index d2088583..17251535 100644 --- a/cmd/superNode.go +++ b/cmd/superNode.go @@ -18,14 +18,12 @@ package cmd import ( "sync" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - "github.com/ethereum/go-ethereum/rpc" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/vulcanize/vulcanizedb/pkg/super_node" - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // superNodeCmd represents the superNode command @@ -85,8 +83,8 @@ func superNode() { wg.Wait() } -func newSuperNode() (super_node.SuperNode, *config.SuperNode, error) { - superNodeConfig, err := config.NewSuperNodeConfig() +func newSuperNode() (super_node.SuperNode, *shared.SuperNodeConfig, error) { + superNodeConfig, err := shared.NewSuperNodeConfig() if err != nil { return nil, nil, err } @@ -97,7 +95,7 @@ func newSuperNode() (super_node.SuperNode, *config.SuperNode, error) { return sn, superNodeConfig, nil } -func startServers(superNode super_node.SuperNode, settings *config.SuperNode) error { +func startServers(superNode super_node.SuperNode, settings *shared.SuperNodeConfig) error { _, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs()) if err != nil { return err diff --git a/pkg/super_node/eth/mocks/blockservice.go b/pkg/ipfs/mocks/blockservice.go similarity index 100% rename from pkg/super_node/eth/mocks/blockservice.go rename to pkg/ipfs/mocks/blockservice.go diff --git a/pkg/super_node/eth/mocks/dag_putters.go b/pkg/ipfs/mocks/dag_putters.go similarity index 100% rename from pkg/super_node/eth/mocks/dag_putters.go rename to pkg/ipfs/mocks/dag_putters.go diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index a9ad071b..63b695b6 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -25,7 +25,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) @@ -59,7 +58,7 @@ type BackFillService struct { } // NewBackFillService returns a new BackFillInterface -func NewBackFillService(settings *config.SuperNode) (BackFillInterface, error) { +func NewBackFillService(settings *shared.SuperNodeConfig) (BackFillInterface, error) { publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath) if err != nil { return nil, err diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index c4d539d1..092e664f 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -27,6 +27,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + mocks2 "github.com/vulcanize/vulcanizedb/pkg/super_node/shared/mocks" ) var _ = Describe("BackFiller", func() { @@ -43,7 +44,7 @@ var _ = Describe("BackFiller", func() { ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload}, ReturnErr: nil, } - mockRetriever := &mocks.MockCIDRetriever{ + mockRetriever := &mocks2.MockCIDRetriever{ FirstBlockNumberToReturn: 1, GapsToRetrieve: []shared.Gap{ { @@ -51,7 +52,7 @@ var _ = Describe("BackFiller", func() { }, }, } - mockFetcher := &mocks.StateDiffFetcher{ + mockFetcher := &mocks2.IPLDFetcher{ PayloadsToReturn: map[uint64]shared.RawChainData{ 100: mocks.MockStateDiffPayload, 101: mocks.MockStateDiffPayload, @@ -97,7 +98,7 @@ var _ = Describe("BackFiller", func() { ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload}, ReturnErr: nil, } - mockRetriever := &mocks.MockCIDRetriever{ + mockRetriever := &mocks2.MockCIDRetriever{ FirstBlockNumberToReturn: 1, GapsToRetrieve: []shared.Gap{ { @@ -105,7 +106,7 @@ var _ = Describe("BackFiller", func() { }, }, } - mockFetcher := &mocks.StateDiffFetcher{ + mockFetcher := &mocks2.IPLDFetcher{ PayloadsToReturn: map[uint64]shared.RawChainData{ 100: mocks.MockStateDiffPayload, }, @@ -147,11 +148,11 @@ var _ = Describe("BackFiller", func() { ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload}, ReturnErr: nil, } - mockRetriever := &mocks.MockCIDRetriever{ + mockRetriever := &mocks2.MockCIDRetriever{ FirstBlockNumberToReturn: 3, GapsToRetrieve: []shared.Gap{}, } - mockFetcher := &mocks.StateDiffFetcher{ + mockFetcher := &mocks2.IPLDFetcher{ PayloadsToReturn: map[uint64]shared.RawChainData{ 1: mocks.MockStateDiffPayload, 2: mocks.MockStateDiffPayload, diff --git a/pkg/super_node/btc/mocks/converter.go b/pkg/super_node/btc/mocks/converter.go new file mode 100644 index 00000000..8f7f2ccd --- /dev/null +++ b/pkg/super_node/btc/mocks/converter.go @@ -0,0 +1,64 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package mocks + +import ( + "fmt" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/btc" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +// PayloadConverter is the underlying struct for the Converter interface +type PayloadConverter struct { + PassedStatediffPayload btc.BlockPayload + ReturnIPLDPayload btc.IPLDPayload + ReturnErr error +} + +// Convert method is used to convert a geth statediff.Payload to a IPLDPayload +func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { + stateDiffPayload, ok := payload.(btc.BlockPayload) + if !ok { + return nil, fmt.Errorf("convert expected payload type %T got %T", btc.BlockPayload{}, payload) + } + pc.PassedStatediffPayload = stateDiffPayload + return pc.ReturnIPLDPayload, pc.ReturnErr +} + +// IterativePayloadConverter is the underlying struct for the Converter interface +type IterativePayloadConverter struct { + PassedStatediffPayload []btc.BlockPayload + ReturnIPLDPayload []btc.IPLDPayload + ReturnErr error + iteration int +} + +// Convert method is used to convert a geth statediff.Payload to a IPLDPayload +func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) { + stateDiffPayload, ok := payload.(btc.BlockPayload) + if !ok { + return nil, fmt.Errorf("convert expected payload type %T got %T", btc.BlockPayload{}, payload) + } + pc.PassedStatediffPayload = append(pc.PassedStatediffPayload, stateDiffPayload) + if len(pc.PassedStatediffPayload) < pc.iteration+1 { + return nil, fmt.Errorf("IterativePayloadConverter does not have a payload to return at iteration %d", pc.iteration) + } + returnPayload := pc.ReturnIPLDPayload[pc.iteration] + pc.iteration++ + return returnPayload, pc.ReturnErr +} diff --git a/pkg/super_node/config/btc_subscription.go b/pkg/super_node/btc/mocks/indexer.go similarity index 50% rename from pkg/super_node/config/btc_subscription.go rename to pkg/super_node/btc/mocks/indexer.go index 50d7e290..699b7076 100644 --- a/pkg/super_node/config/btc_subscription.go +++ b/pkg/super_node/btc/mocks/indexer.go @@ -14,4 +14,30 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package config +package mocks + +import ( + "fmt" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/btc" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" +) + +// CIDIndexer is the underlying struct for the Indexer interface +type CIDIndexer struct { + PassedCIDPayload []*btc.CIDPayload + ReturnErr error +} + +// Index indexes a cidPayload in Postgres +func (repo *CIDIndexer) Index(cids shared.CIDsForIndexing) error { + cidPayload, ok := cids.(*btc.CIDPayload) + if !ok { + return fmt.Errorf("index expected cids type %T got %T", ð.CIDPayload{}, cids) + } + repo.PassedCIDPayload = append(repo.PassedCIDPayload, cidPayload) + return repo.ReturnErr +} diff --git a/pkg/super_node/btc/mocks/publisher.go b/pkg/super_node/btc/mocks/publisher.go new file mode 100644 index 00000000..dc15fe52 --- /dev/null +++ b/pkg/super_node/btc/mocks/publisher.go @@ -0,0 +1,65 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package mocks + +import ( + "fmt" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/btc" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +// IPLDPublisher is the underlying struct for the Publisher interface +type IPLDPublisher struct { + PassedIPLDPayload btc.IPLDPayload + ReturnCIDPayload *btc.CIDPayload + ReturnErr error +} + +// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload +func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { + ipldPayload, ok := payload.(btc.IPLDPayload) + if !ok { + return nil, fmt.Errorf("publish expected payload type %T got %T", &btc.IPLDPayload{}, payload) + } + pub.PassedIPLDPayload = ipldPayload + return pub.ReturnCIDPayload, pub.ReturnErr +} + +// IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing +type IterativeIPLDPublisher struct { + PassedIPLDPayload []btc.IPLDPayload + ReturnCIDPayload []*btc.CIDPayload + ReturnErr error + iteration int +} + +// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload +func (pub *IterativeIPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) { + ipldPayload, ok := payload.(btc.IPLDPayload) + if !ok { + return nil, fmt.Errorf("publish expected payload type %T got %T", &btc.IPLDPayload{}, payload) + } + pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, ipldPayload) + if len(pub.ReturnCIDPayload) < pub.iteration+1 { + return nil, fmt.Errorf("IterativeIPLDPublisher does not have a payload to return at iteration %d", pub.iteration) + } + returnPayload := pub.ReturnCIDPayload[pub.iteration] + pub.iteration++ + return returnPayload, pub.ReturnErr +} diff --git a/pkg/super_node/btc/subscription_config.go b/pkg/super_node/btc/subscription_config.go new file mode 100644 index 00000000..8e4a59b2 --- /dev/null +++ b/pkg/super_node/btc/subscription_config.go @@ -0,0 +1,118 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package btc + +import ( + "math/big" + + "github.com/spf13/viper" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +/* +// HeaderModel is the db model for btc.header_cids table +// TxInput is the db model for btc.tx_inputs table +type TxInput struct { + ID int64 `db:"id"` + TxID int64 `db:"tx_id"` + Index int64 `db:"index"` + TxWitness [][]byte `db:"tx_witness"` + SignatureScript []byte `db:"sig_script"` + PreviousOutPointHash string `db:"outpoint_hash"` + PreviousOutPointIndex uint32 `db:"outpoint_index"` +} + +// TxOutput is the db model for btc.tx_outputs table +type TxOutput struct { + ID int64 `db:"id"` + TxID int64 `db:"tx_id"` + Index int64 `db:"index"` + Value int64 `db:"value"` + PkScript []byte `db:"pk_script"` +} + +*/ +// SubscriptionSettings config is used by a subscriber to specify what bitcoin data to stream from the super node +type SubscriptionSettings struct { + BackFill bool + BackFillOnly bool + Start *big.Int + End *big.Int // set to 0 or a negative value to have no ending block + HeaderFilter HeaderFilter + TxFilter TxFilter +} + +// HeaderFilter contains filter settings for headers +type HeaderFilter struct { + Off bool +} + +// TxFilter contains filter settings for txs +type TxFilter struct { + Off bool + Index int64 // allow filtering by index so that we can filter for only coinbase transactions (index 0) if we want to + HasWitness bool + WitnessHashes []string +} + +// Init is used to initialize a EthSubscription struct with env variables +func NewEthSubscriptionConfig() (*SubscriptionSettings, error) { + sc := new(SubscriptionSettings) + // Below default to false, which means we do not backfill by default + sc.BackFill = viper.GetBool("superNode.ethSubscription.historicalData") + sc.BackFillOnly = viper.GetBool("superNode.ethSubscription.historicalDataOnly") + // Below default to 0 + // 0 start means we start at the beginning and 0 end means we continue indefinitely + sc.Start = big.NewInt(viper.GetInt64("superNode.ethSubscription.startingBlock")) + sc.End = big.NewInt(viper.GetInt64("superNode.ethSubscription.endingBlock")) + // Below default to false, which means we get all headers and no uncles by default + sc.HeaderFilter = HeaderFilter{ + Off: viper.GetBool("superNode.ethSubscription.off"), + } + // Below defaults to false and two slices of length 0 + // Which means we get all transactions by default + sc.TxFilter = TxFilter{ + Off: viper.GetBool("superNode.ethSubscription.txFilter.off"), + } + return sc, nil +} + +// StartingBlock satisfies the SubscriptionSettings() interface +func (sc *SubscriptionSettings) StartingBlock() *big.Int { + return sc.Start +} + +// EndingBlock satisfies the SubscriptionSettings() interface +func (sc *SubscriptionSettings) EndingBlock() *big.Int { + return sc.End +} + +// HistoricalData satisfies the SubscriptionSettings() interface +func (sc *SubscriptionSettings) HistoricalData() bool { + return sc.BackFill +} + +// HistoricalDataOnly satisfies the SubscriptionSettings() interface +func (sc *SubscriptionSettings) HistoricalDataOnly() bool { + return sc.BackFillOnly +} + +// ChainType satisfies the SubscriptionSettings() interface +func (sc *SubscriptionSettings) ChainType() shared.ChainType { + return shared.Bitcoin +} diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index 219c839d..210966b1 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -20,23 +20,20 @@ import ( "fmt" "github.com/btcsuite/btcd/rpcclient" - "github.com/vulcanize/vulcanizedb/pkg/super_node/btc" - - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" "github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" + "github.com/vulcanize/vulcanizedb/pkg/super_node/btc" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // NewResponseFilterer constructs a ResponseFilterer for the provided chain type -func NewResponseFilterer(chain config.ChainType) (shared.ResponseFilterer, error) { +func NewResponseFilterer(chain shared.ChainType) (shared.ResponseFilterer, error) { switch chain { - case config.Ethereum: + case shared.Ethereum: return eth.NewResponseFilterer(), nil default: return nil, fmt.Errorf("invalid chain %T for filterer constructor", chain) @@ -44,11 +41,11 @@ func NewResponseFilterer(chain config.ChainType) (shared.ResponseFilterer, error } // NewCIDIndexer constructs a CIDIndexer for the provided chain type -func NewCIDIndexer(chain config.ChainType, db *postgres.DB) (shared.CIDIndexer, error) { +func NewCIDIndexer(chain shared.ChainType, db *postgres.DB) (shared.CIDIndexer, error) { switch chain { - case config.Ethereum: + case shared.Ethereum: return eth.NewCIDIndexer(db), nil - case config.Bitcoin: + case shared.Bitcoin: return btc.NewCIDIndexer(db), nil default: return nil, fmt.Errorf("invalid chain %T for indexer constructor", chain) @@ -56,9 +53,9 @@ func NewCIDIndexer(chain config.ChainType, db *postgres.DB) (shared.CIDIndexer, } // NewCIDRetriever constructs a CIDRetriever for the provided chain type -func NewCIDRetriever(chain config.ChainType, db *postgres.DB) (shared.CIDRetriever, error) { +func NewCIDRetriever(chain shared.ChainType, db *postgres.DB) (shared.CIDRetriever, error) { switch chain { - case config.Ethereum: + case shared.Ethereum: return eth.NewCIDRetriever(db), nil default: return nil, fmt.Errorf("invalid chain %T for retriever constructor", chain) @@ -66,9 +63,9 @@ func NewCIDRetriever(chain config.ChainType, db *postgres.DB) (shared.CIDRetriev } // NewPayloadStreamer constructs a PayloadStreamer for the provided chain type -func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error) { +func NewPayloadStreamer(chain shared.ChainType, client interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error) { switch chain { - case config.Ethereum: + case shared.Ethereum: ethClient, ok := client.(core.RPCClient) if !ok { var expectedClientType core.RPCClient @@ -76,10 +73,10 @@ func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.Payl } streamChan := make(chan shared.RawChainData, eth.PayloadChanBufferSize) return eth.NewPayloadStreamer(ethClient), streamChan, nil - case config.Bitcoin: + case shared.Bitcoin: btcClientConn, ok := client.(*rpcclient.ConnConfig) if !ok { - return nil, nil, fmt.Errorf("bitcoin payload streamer constructor expected client config type %T got %T", rpcclient.ConnConfig{}, client) + return nil, nil, fmt.Errorf("bitcoin payload streamer constructor expected client shared type %T got %T", rpcclient.ConnConfig{}, client) } streamChan := make(chan shared.RawChainData, btc.PayloadChanBufferSize) return btc.NewPayloadStreamer(btcClientConn), streamChan, nil @@ -89,9 +86,9 @@ func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.Payl } // NewPaylaodFetcher constructs a PayloadFetcher for the provided chain type -func NewPaylaodFetcher(chain config.ChainType, client interface{}) (shared.PayloadFetcher, error) { +func NewPaylaodFetcher(chain shared.ChainType, client interface{}) (shared.PayloadFetcher, error) { switch chain { - case config.Ethereum: + case shared.Ethereum: batchClient, ok := client.(eth.BatchClient) if !ok { var expectedClient eth.BatchClient @@ -104,11 +101,11 @@ func NewPaylaodFetcher(chain config.ChainType, client interface{}) (shared.Paylo } // NewPayloadConverter constructs a PayloadConverter for the provided chain type -func NewPayloadConverter(chain config.ChainType) (shared.PayloadConverter, error) { +func NewPayloadConverter(chain shared.ChainType) (shared.PayloadConverter, error) { switch chain { - case config.Ethereum: + case shared.Ethereum: return eth.NewPayloadConverter(params.MainnetChainConfig), nil - case config.Bitcoin: + case shared.Bitcoin: return btc.NewPayloadConverter(), nil default: return nil, fmt.Errorf("invalid chain %T for converter constructor", chain) @@ -116,9 +113,9 @@ func NewPayloadConverter(chain config.ChainType) (shared.PayloadConverter, error } // NewIPLDFetcher constructs an IPLDFetcher for the provided chain type -func NewIPLDFetcher(chain config.ChainType, ipfsPath string) (shared.IPLDFetcher, error) { +func NewIPLDFetcher(chain shared.ChainType, ipfsPath string) (shared.IPLDFetcher, error) { switch chain { - case config.Ethereum: + case shared.Ethereum: return eth.NewIPLDFetcher(ipfsPath) default: return nil, fmt.Errorf("invalid chain %T for fetcher constructor", chain) @@ -126,11 +123,11 @@ func NewIPLDFetcher(chain config.ChainType, ipfsPath string) (shared.IPLDFetcher } // NewIPLDPublisher constructs an IPLDPublisher for the provided chain type -func NewIPLDPublisher(chain config.ChainType, ipfsPath string) (shared.IPLDPublisher, error) { +func NewIPLDPublisher(chain shared.ChainType, ipfsPath string) (shared.IPLDPublisher, error) { switch chain { - case config.Ethereum: + case shared.Ethereum: return eth.NewIPLDPublisher(ipfsPath) - case config.Bitcoin: + case shared.Bitcoin: return btc.NewIPLDPublisher(ipfsPath) default: return nil, fmt.Errorf("invalid chain %T for publisher constructor", chain) @@ -138,9 +135,9 @@ func NewIPLDPublisher(chain config.ChainType, ipfsPath string) (shared.IPLDPubli } // NewIPLDResolver constructs an IPLDResolver for the provided chain type -func NewIPLDResolver(chain config.ChainType) (shared.IPLDResolver, error) { +func NewIPLDResolver(chain shared.ChainType) (shared.IPLDResolver, error) { switch chain { - case config.Ethereum: + case shared.Ethereum: return eth.NewIPLDResolver(), nil default: return nil, fmt.Errorf("invalid chain %T for resolver constructor", chain) @@ -148,9 +145,9 @@ func NewIPLDResolver(chain config.ChainType) (shared.IPLDResolver, error) { } // NewPublicAPI constructs a PublicAPI for the provided chain type -func NewPublicAPI(chain config.ChainType, db *postgres.DB, ipfsPath string) (rpc.API, error) { +func NewPublicAPI(chain shared.ChainType, db *postgres.DB, ipfsPath string) (rpc.API, error) { switch chain { - case config.Ethereum: + case shared.Ethereum: backend, err := eth.NewEthBackend(db, ipfsPath) if err != nil { return rpc.API{}, err diff --git a/pkg/super_node/eth/api.go b/pkg/super_node/eth/api.go index ec712d6d..7418c4d8 100644 --- a/pkg/super_node/eth/api.go +++ b/pkg/super_node/eth/api.go @@ -27,8 +27,6 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/ipfs/go-block-format" - - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" ) // APIName is the namespace for the super node's eth api @@ -72,7 +70,7 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery) topicStrSets[i] = append(topicStrSets[i], topic.String()) } } - filter := config.ReceiptFilter{ + filter := ReceiptFilter{ Contracts: addrStrs, Topics: topicStrSets, } diff --git a/pkg/super_node/eth/backend.go b/pkg/super_node/eth/backend.go index 476745b5..8edc7e2d 100644 --- a/pkg/super_node/eth/backend.go +++ b/pkg/super_node/eth/backend.go @@ -29,7 +29,6 @@ import ( "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" ) var ( @@ -123,7 +122,7 @@ func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log if err != nil { return nil, err } - receiptCIDs, err := b.retriever.RetrieveRctCIDs(tx, config.ReceiptFilter{}, 0, &hash, nil) + receiptCIDs, err := b.retriever.RetrieveRctCIDs(tx, ReceiptFilter{}, 0, &hash, nil) if err != nil { if err := tx.Rollback(); err != nil { logrus.Error(err) diff --git a/pkg/super_node/eth/filterer.go b/pkg/super_node/eth/filterer.go index 33c9f918..3fb23016 100644 --- a/pkg/super_node/eth/filterer.go +++ b/pkg/super_node/eth/filterer.go @@ -20,14 +20,12 @@ import ( "bytes" "fmt" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // ResponseFilterer satisfies the ResponseFilterer interface for ethereum @@ -40,9 +38,9 @@ func NewResponseFilterer() *ResponseFilterer { // Filter is used to filter through eth data to extract and package requested data into a Payload func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.StreamedIPLDs) (shared.ServerResponse, error) { - ethFilters, ok := filter.(*config.EthSubscription) + ethFilters, ok := filter.(*SubscriptionSettings) if !ok { - return StreamResponse{}, fmt.Errorf("eth filterer expected filter type %T got %T", &config.EthSubscription{}, filter) + return StreamResponse{}, fmt.Errorf("eth filterer expected filter type %T got %T", &SubscriptionSettings{}, filter) } ethPayload, ok := payload.(IPLDPayload) if !ok { @@ -76,7 +74,7 @@ func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload sh return StreamResponse{}, nil } -func (s *ResponseFilterer) filterHeaders(headerFilter config.HeaderFilter, response *StreamResponse, payload IPLDPayload) error { +func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *StreamResponse, payload IPLDPayload) error { if !headerFilter.Off { headerRLP, err := rlp.EncodeToBytes(payload.Block.Header()) if err != nil { @@ -104,7 +102,7 @@ func checkRange(start, end, actual int64) bool { return false } -func (s *ResponseFilterer) filterTransactions(trxFilter config.TxFilter, response *StreamResponse, payload IPLDPayload) ([]common.Hash, error) { +func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *StreamResponse, payload IPLDPayload) ([]common.Hash, error) { trxHashes := make([]common.Hash, 0, len(payload.Block.Body().Transactions)) if !trxFilter.Off { for i, trx := range payload.Block.Body().Transactions { @@ -139,7 +137,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin return false } -func (s *ResponseFilterer) filerReceipts(receiptFilter config.ReceiptFilter, response *StreamResponse, payload IPLDPayload, trxHashes []common.Hash) error { +func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *StreamResponse, payload IPLDPayload, trxHashes []common.Hash) error { if !receiptFilter.Off { for i, receipt := range payload.Receipts { // topics is always length 4 @@ -223,7 +221,7 @@ func slicesShareString(slice1, slice2 []string) int { return 0 } -func (s *ResponseFilterer) filterState(stateFilter config.StateFilter, response *StreamResponse, payload IPLDPayload) error { +func (s *ResponseFilterer) filterState(stateFilter StateFilter, response *StreamResponse, payload IPLDPayload) error { if !stateFilter.Off { response.StateNodesRlp = make(map[common.Hash][]byte) keyFilters := make([]common.Hash, len(stateFilter.Addresses)) @@ -254,7 +252,7 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool { return false } -func (s *ResponseFilterer) filterStorage(storageFilter config.StorageFilter, response *StreamResponse, payload IPLDPayload) error { +func (s *ResponseFilterer) filterStorage(storageFilter StorageFilter, response *StreamResponse, payload IPLDPayload) error { if !storageFilter.Off { response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte) stateKeyFilters := make([]common.Hash, len(storageFilter.Addresses)) diff --git a/pkg/super_node/eth/indexer.go b/pkg/super_node/eth/indexer.go index 9ea10c84..da0c664a 100644 --- a/pkg/super_node/eth/indexer.go +++ b/pkg/super_node/eth/indexer.go @@ -23,7 +23,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/jmoiron/sqlx" - "github.com/lib/pq" log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres" @@ -119,7 +118,7 @@ func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPa func (in *CIDIndexer) indexReceiptCID(tx *sqlx.Tx, cidMeta ReceiptModel, txID int64) error { _, err := tx.Exec(`INSERT INTO eth.receipt_cids (tx_id, cid, contract, topic0s, topic1s, topic2s, topic3s) VALUES ($1, $2, $3, $4, $5, $6, $7)`, - txID, cidMeta.CID, cidMeta.Contract, pq.Array(cidMeta.Topic0s), pq.Array(cidMeta.Topic1s), pq.Array(cidMeta.Topic2s), pq.Array(cidMeta.Topic3s)) + txID, cidMeta.CID, cidMeta.Contract, cidMeta.Topic0s, cidMeta.Topic1s, cidMeta.Topic2s, cidMeta.Topic3s) return err } diff --git a/pkg/super_node/eth/ipld_fetcher_test.go b/pkg/super_node/eth/ipld_fetcher_test.go index 1c5869c1..ee0d954c 100644 --- a/pkg/super_node/eth/ipld_fetcher_test.go +++ b/pkg/super_node/eth/ipld_fetcher_test.go @@ -24,8 +24,8 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" - "github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks" ) var ( diff --git a/pkg/super_node/eth/models.go b/pkg/super_node/eth/models.go index 0cc931e5..2a9e7309 100644 --- a/pkg/super_node/eth/models.go +++ b/pkg/super_node/eth/models.go @@ -18,6 +18,7 @@ package eth import "github.com/lib/pq" +// HeaderModel is the db model for eth.header_cids type HeaderModel struct { ID int64 `db:"id"` BlockNumber string `db:"block_number"` @@ -27,6 +28,7 @@ type HeaderModel struct { TotalDifficulty string `db:"td"` } +// UncleModel is the db model for eth.uncle_cids type UncleModel struct { ID int64 `db:"id"` HeaderID int64 `db:"header_id"` @@ -35,6 +37,7 @@ type UncleModel struct { CID string `db:"cid"` } +// TxModel is the db model for eth.transaction_cids type TxModel struct { ID int64 `db:"id"` HeaderID int64 `db:"header_id"` @@ -45,6 +48,7 @@ type TxModel struct { Src string `db:"src"` } +// ReceiptModel is the db model for eth.receipt_cids type ReceiptModel struct { ID int64 `db:"id"` TxID int64 `db:"tx_id"` @@ -56,6 +60,7 @@ type ReceiptModel struct { Topic3s pq.StringArray `db:"topic3s"` } +// StateNodeModel is the db model for eth.state_cids type StateNodeModel struct { ID int64 `db:"id"` HeaderID int64 `db:"header_id"` @@ -64,6 +69,7 @@ type StateNodeModel struct { CID string `db:"cid"` } +// StorageNodeModel is the db model for eth.storage_cids type StorageNodeModel struct { ID int64 `db:"id"` StateID int64 `db:"state_id"` @@ -72,6 +78,7 @@ type StorageNodeModel struct { CID string `db:"cid"` } +// StorageNodeWithStateKeyModel is a db model for eth.storage_cids + eth.state_cids.state_key type StorageNodeWithStateKeyModel struct { ID int64 `db:"id"` StateID int64 `db:"state_id"` diff --git a/pkg/super_node/eth/publisher_test.go b/pkg/super_node/eth/publisher_test.go index 65d9872f..e7292f49 100644 --- a/pkg/super_node/eth/publisher_test.go +++ b/pkg/super_node/eth/publisher_test.go @@ -21,25 +21,26 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + mocks2 "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks" ) var ( - mockHeaderDagPutter *mocks.DagPutter - mockTrxDagPutter *mocks.DagPutter - mockRctDagPutter *mocks.DagPutter - mockStateDagPutter *mocks.MappedDagPutter - mockStorageDagPutter *mocks.DagPutter + mockHeaderDagPutter *mocks2.DagPutter + mockTrxDagPutter *mocks2.DagPutter + mockRctDagPutter *mocks2.DagPutter + mockStateDagPutter *mocks2.MappedDagPutter + mockStorageDagPutter *mocks2.DagPutter ) var _ = Describe("Publisher", func() { BeforeEach(func() { - mockHeaderDagPutter = new(mocks.DagPutter) - mockTrxDagPutter = new(mocks.DagPutter) - mockRctDagPutter = new(mocks.DagPutter) - mockStateDagPutter = new(mocks.MappedDagPutter) - mockStorageDagPutter = new(mocks.DagPutter) + mockHeaderDagPutter = new(mocks2.DagPutter) + mockTrxDagPutter = new(mocks2.DagPutter) + mockRctDagPutter = new(mocks2.DagPutter) + mockStateDagPutter = new(mocks2.MappedDagPutter) + mockStorageDagPutter = new(mocks2.DagPutter) }) Describe("Publish", func() { diff --git a/pkg/super_node/eth/retriever.go b/pkg/super_node/eth/retriever.go index 8b4dc000..12552c16 100644 --- a/pkg/super_node/eth/retriever.go +++ b/pkg/super_node/eth/retriever.go @@ -27,7 +27,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) @@ -59,9 +58,9 @@ func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) { // Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) { - streamFilter, ok := filter.(*config.EthSubscription) + streamFilter, ok := filter.(*SubscriptionSettings) if !ok { - return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &config.EthSubscription{}, filter) + return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &SubscriptionSettings{}, filter) } log.Debug("retrieving cids") tx, err := ecr.db.Beginx() @@ -173,7 +172,7 @@ func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64 // RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters // also returns the ids for the returned transaction cids -func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter config.TxFilter, blockNumber int64) ([]TxModel, error) { +func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNumber int64) ([]TxModel, error) { log.Debug("retrieving transaction cids for block ", blockNumber) args := make([]interface{}, 0, 3) results := make([]TxModel, 0) @@ -196,7 +195,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter config.TxFilter, b // RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight that conform to the provided // filter parameters and correspond to the provided tx ids -func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter config.ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) { +func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) { log.Debug("retrieving receipt cids for block ", blockNumber) id := 1 args := make([]interface{}, 0, 4) @@ -282,7 +281,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter config.ReceiptFi } // RetrieveStateCIDs retrieves and returns all of the state node cids at the provided blockheight that conform to the provided filter parameters -func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter config.StateFilter, blockNumber int64) ([]StateNodeModel, error) { +func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, blockNumber int64) ([]StateNodeModel, error) { log.Debug("retrieving state cids for block ", blockNumber) args := make([]interface{}, 0, 2) pgStr := `SELECT state_cids.id, state_cids.header_id, @@ -307,7 +306,7 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter config.State } // RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided blockheight that conform to the provided filter parameters -func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter config.StorageFilter, blockNumber int64) ([]StorageNodeWithStateKeyModel, error) { +func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, blockNumber int64) ([]StorageNodeWithStateKeyModel, error) { log.Debug("retrieving storage cids for block ", blockNumber) args := make([]interface{}, 0, 3) pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_key, @@ -472,7 +471,7 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.H func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) { log.Debug("retrieving tx cids for block id ", headerID) pgStr := `SELECT * FROM eth.transaction_cids - WHERE eth.transaction_cids.header_id = $1` + WHERE header_id = $1` var txCIDs []TxModel return txCIDs, tx.Select(&txCIDs, pgStr, headerID) } @@ -481,7 +480,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ( func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]ReceiptModel, error) { log.Debugf("retrieving receipt cids for tx ids %v", txIDs) pgStr := `SELECT * FROM eth.receipt_cids - WHERE eth.receipt_cids.tx_id = ANY($1::INTEGER[])` + WHERE tx_id = ANY($1::INTEGER[])` var rctCIDs []ReceiptModel return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs)) } diff --git a/pkg/super_node/eth/retriever_test.go b/pkg/super_node/eth/retriever_test.go index 5311794f..17c1da37 100644 --- a/pkg/super_node/eth/retriever_test.go +++ b/pkg/super_node/eth/retriever_test.go @@ -23,7 +23,6 @@ import ( . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres" - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" eth2 "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks" @@ -31,175 +30,175 @@ import ( ) var ( - openFilter = &config.EthSubscription{ + openFilter = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), - HeaderFilter: config.HeaderFilter{}, - TxFilter: config.TxFilter{}, - ReceiptFilter: config.ReceiptFilter{}, - StateFilter: config.StateFilter{}, - StorageFilter: config.StorageFilter{}, + HeaderFilter: eth.HeaderFilter{}, + TxFilter: eth.TxFilter{}, + ReceiptFilter: eth.ReceiptFilter{}, + StateFilter: eth.StateFilter{}, + StorageFilter: eth.StorageFilter{}, } - rctContractFilter = &config.EthSubscription{ + rctContractFilter = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), - HeaderFilter: config.HeaderFilter{ + HeaderFilter: eth.HeaderFilter{ Off: true, }, - TxFilter: config.TxFilter{ + TxFilter: eth.TxFilter{ Off: true, }, - ReceiptFilter: config.ReceiptFilter{ + ReceiptFilter: eth.ReceiptFilter{ Contracts: []string{mocks.AnotherAddress.String()}, }, - StateFilter: config.StateFilter{ + StateFilter: eth.StateFilter{ Off: true, }, - StorageFilter: config.StorageFilter{ + StorageFilter: eth.StorageFilter{ Off: true, }, } - rctTopicsFilter = &config.EthSubscription{ + rctTopicsFilter = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), - HeaderFilter: config.HeaderFilter{ + HeaderFilter: eth.HeaderFilter{ Off: true, }, - TxFilter: config.TxFilter{ + TxFilter: eth.TxFilter{ Off: true, }, - ReceiptFilter: config.ReceiptFilter{ + ReceiptFilter: eth.ReceiptFilter{ Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000004"}}, }, - StateFilter: config.StateFilter{ + StateFilter: eth.StateFilter{ Off: true, }, - StorageFilter: config.StorageFilter{ + StorageFilter: eth.StorageFilter{ Off: true, }, } - rctTopicsAndContractFilter = &config.EthSubscription{ + rctTopicsAndContractFilter = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), - HeaderFilter: config.HeaderFilter{ + HeaderFilter: eth.HeaderFilter{ Off: true, }, - TxFilter: config.TxFilter{ + TxFilter: eth.TxFilter{ Off: true, }, - ReceiptFilter: config.ReceiptFilter{ + ReceiptFilter: eth.ReceiptFilter{ Topics: [][]string{ {"0x0000000000000000000000000000000000000000000000000000000000000004"}, {"0x0000000000000000000000000000000000000000000000000000000000000006"}, }, Contracts: []string{mocks.Address.String()}, }, - StateFilter: config.StateFilter{ + StateFilter: eth.StateFilter{ Off: true, }, - StorageFilter: config.StorageFilter{ + StorageFilter: eth.StorageFilter{ Off: true, }, } - rctTopicsAndContractFilterFail = &config.EthSubscription{ + rctTopicsAndContractFilterFail = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), - HeaderFilter: config.HeaderFilter{ + HeaderFilter: eth.HeaderFilter{ Off: true, }, - TxFilter: config.TxFilter{ + TxFilter: eth.TxFilter{ Off: true, }, - ReceiptFilter: config.ReceiptFilter{ + ReceiptFilter: eth.ReceiptFilter{ Topics: [][]string{ {"0x0000000000000000000000000000000000000000000000000000000000000004"}, {"0x0000000000000000000000000000000000000000000000000000000000000007"}, // This topic won't match on the mocks.Address.String() contract receipt }, Contracts: []string{mocks.Address.String()}, }, - StateFilter: config.StateFilter{ + StateFilter: eth.StateFilter{ Off: true, }, - StorageFilter: config.StorageFilter{ + StorageFilter: eth.StorageFilter{ Off: true, }, } - rctContractsAndTopicFilter = &config.EthSubscription{ + rctContractsAndTopicFilter = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), - HeaderFilter: config.HeaderFilter{ + HeaderFilter: eth.HeaderFilter{ Off: true, }, - TxFilter: config.TxFilter{ + TxFilter: eth.TxFilter{ Off: true, }, - ReceiptFilter: config.ReceiptFilter{ + ReceiptFilter: eth.ReceiptFilter{ Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000005"}}, Contracts: []string{mocks.Address.String(), mocks.AnotherAddress.String()}, }, - StateFilter: config.StateFilter{ + StateFilter: eth.StateFilter{ Off: true, }, - StorageFilter: config.StorageFilter{ + StorageFilter: eth.StorageFilter{ Off: true, }, } - rctsForAllCollectedTrxs = &config.EthSubscription{ + rctsForAllCollectedTrxs = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), - HeaderFilter: config.HeaderFilter{ + HeaderFilter: eth.HeaderFilter{ Off: true, }, - TxFilter: config.TxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter - ReceiptFilter: config.ReceiptFilter{ + TxFilter: eth.TxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter + ReceiptFilter: eth.ReceiptFilter{ MatchTxs: true, Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have }, - StateFilter: config.StateFilter{ + StateFilter: eth.StateFilter{ Off: true, }, - StorageFilter: config.StorageFilter{ + StorageFilter: eth.StorageFilter{ Off: true, }, } - rctsForSelectCollectedTrxs = &config.EthSubscription{ + rctsForSelectCollectedTrxs = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), - HeaderFilter: config.HeaderFilter{ + HeaderFilter: eth.HeaderFilter{ Off: true, }, - TxFilter: config.TxFilter{ + TxFilter: eth.TxFilter{ Dst: []string{mocks.AnotherAddress.String()}, // We only filter for one of the trxs so we will only get the one corresponding receipt }, - ReceiptFilter: config.ReceiptFilter{ + ReceiptFilter: eth.ReceiptFilter{ MatchTxs: true, Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have }, - StateFilter: config.StateFilter{ + StateFilter: eth.StateFilter{ Off: true, }, - StorageFilter: config.StorageFilter{ + StorageFilter: eth.StorageFilter{ Off: true, }, } - stateFilter = &config.EthSubscription{ + stateFilter = ð.SubscriptionSettings{ Start: big.NewInt(0), End: big.NewInt(1), - HeaderFilter: config.HeaderFilter{ + HeaderFilter: eth.HeaderFilter{ Off: true, }, - TxFilter: config.TxFilter{ + TxFilter: eth.TxFilter{ Off: true, }, - ReceiptFilter: config.ReceiptFilter{ + ReceiptFilter: eth.ReceiptFilter{ Off: true, }, - StateFilter: config.StateFilter{ + StateFilter: eth.StateFilter{ Addresses: []string{mocks.Address.Hex()}, }, - StorageFilter: config.StorageFilter{ + StorageFilter: eth.StorageFilter{ Off: true, }, } diff --git a/pkg/super_node/config/eth_subscription.go b/pkg/super_node/eth/subscription_config.go similarity index 88% rename from pkg/super_node/config/eth_subscription.go rename to pkg/super_node/eth/subscription_config.go index 3108990a..368d461a 100644 --- a/pkg/super_node/config/eth_subscription.go +++ b/pkg/super_node/eth/subscription_config.go @@ -14,17 +14,19 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package config +package eth import ( "errors" "math/big" "github.com/spf13/viper" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) -// EthSubscription config is used by a subscriber to specify what eth data to stream from the super node -type EthSubscription struct { +// SubscriptionSettings config is used by a subscriber to specify what eth data to stream from the super node +type SubscriptionSettings struct { BackFill bool BackFillOnly bool Start *big.Int @@ -73,8 +75,8 @@ type StorageFilter struct { } // Init is used to initialize a EthSubscription struct with env variables -func NewEthSubscriptionConfig() (*EthSubscription, error) { - sc := new(EthSubscription) +func NewEthSubscriptionConfig() (*SubscriptionSettings, error) { + sc := new(SubscriptionSettings) // Below default to false, which means we do not backfill by default sc.BackFill = viper.GetBool("superNode.ethSubscription.historicalData") sc.BackFillOnly = viper.GetBool("superNode.ethSubscription.historicalDataOnly") @@ -126,26 +128,26 @@ func NewEthSubscriptionConfig() (*EthSubscription, error) { } // StartingBlock satisfies the SubscriptionSettings() interface -func (sc *EthSubscription) StartingBlock() *big.Int { +func (sc *SubscriptionSettings) StartingBlock() *big.Int { return sc.Start } // EndingBlock satisfies the SubscriptionSettings() interface -func (sc *EthSubscription) EndingBlock() *big.Int { +func (sc *SubscriptionSettings) EndingBlock() *big.Int { return sc.End } // HistoricalData satisfies the SubscriptionSettings() interface -func (sc *EthSubscription) HistoricalData() bool { +func (sc *SubscriptionSettings) HistoricalData() bool { return sc.BackFill } // HistoricalDataOnly satisfies the SubscriptionSettings() interface -func (sc *EthSubscription) HistoricalDataOnly() bool { +func (sc *SubscriptionSettings) HistoricalDataOnly() bool { return sc.BackFillOnly } // ChainType satisfies the SubscriptionSettings() interface -func (sc *EthSubscription) ChainType() ChainType { - return Ethereum +func (sc *SubscriptionSettings) ChainType() shared.ChainType { + return shared.Ethereum } diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index af0bdb42..c650533f 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -31,7 +31,6 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/eth/core" "github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/ipfs" - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) @@ -90,7 +89,7 @@ type Service struct { // Number of publishAndIndex workers WorkerPoolSize int // chain type for this service - chain config.ChainType + chain shared.ChainType // Path to ipfs data dir ipfsPath string // Underlying db @@ -98,7 +97,7 @@ type Service struct { } // NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct -func NewSuperNode(settings *config.SuperNode) (SuperNode, error) { +func NewSuperNode(settings *shared.SuperNodeConfig) (SuperNode, error) { if err := ipfs.InitIPFSPlugins(); err != nil { return nil, err } diff --git a/pkg/super_node/service_test.go b/pkg/super_node/service_test.go index 14c39001..20deaa32 100644 --- a/pkg/super_node/service_test.go +++ b/pkg/super_node/service_test.go @@ -20,15 +20,14 @@ import ( "sync" "time" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/statediff" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/super_node" - mocks2 "github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks" + "github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + mocks2 "github.com/vulcanize/vulcanizedb/pkg/super_node/shared/mocks" ) var _ = Describe("Service", func() { @@ -37,22 +36,22 @@ var _ = Describe("Service", func() { wg := new(sync.WaitGroup) payloadChan := make(chan shared.RawChainData, 1) quitChan := make(chan bool, 1) - mockCidIndexer := &mocks2.CIDIndexer{ + mockCidIndexer := &mocks.CIDIndexer{ ReturnErr: nil, } - mockPublisher := &mocks2.IPLDPublisher{ - ReturnCIDPayload: mocks2.MockCIDPayload, + mockPublisher := &mocks.IPLDPublisher{ + ReturnCIDPayload: mocks.MockCIDPayload, ReturnErr: nil, } - mockStreamer := &mocks2.StateDiffStreamer{ + mockStreamer := &mocks2.PayloadStreamer{ ReturnSub: &rpc.ClientSubscription{}, - StreamPayloads: []statediff.Payload{ - mocks2.MockStateDiffPayload, + StreamPayloads: []shared.RawChainData{ + mocks.MockStateDiffPayload, }, ReturnErr: nil, } - mockConverter := &mocks2.PayloadConverter{ - ReturnIPLDPayload: mocks2.MockIPLDPayload, + mockConverter := &mocks.PayloadConverter{ + ReturnIPLDPayload: mocks.MockIPLDPayload, ReturnErr: nil, } processor := &super_node.Service{ @@ -69,10 +68,10 @@ var _ = Describe("Service", func() { time.Sleep(2 * time.Second) quitChan <- true wg.Wait() - Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks2.MockStateDiffPayload)) + Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload)) Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1)) - Expect(mockCidIndexer.PassedCIDPayload[0]).To(Equal(mocks2.MockCIDPayload)) - Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks2.MockIPLDPayload)) + Expect(mockCidIndexer.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) + Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockIPLDPayload)) Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan)) }) }) diff --git a/pkg/super_node/config/chain_type.go b/pkg/super_node/shared/chain_type.go similarity index 98% rename from pkg/super_node/config/chain_type.go rename to pkg/super_node/shared/chain_type.go index 3b4305a9..3aad798c 100644 --- a/pkg/super_node/config/chain_type.go +++ b/pkg/super_node/shared/chain_type.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package config +package shared import ( "errors" diff --git a/pkg/super_node/config/config.go b/pkg/super_node/shared/config.go similarity index 95% rename from pkg/super_node/config/config.go rename to pkg/super_node/shared/config.go index 75d1a246..5cc9d1e6 100644 --- a/pkg/super_node/config/config.go +++ b/pkg/super_node/shared/config.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -package config +package shared import ( "fmt" @@ -36,8 +36,8 @@ import ( "github.com/vulcanize/vulcanizedb/utils" ) -// SuperNode config struct -type SuperNode struct { +// SuperNodeConfig struct +type SuperNodeConfig struct { // Ubiquitous fields Chain ChainType IPFSPath string @@ -62,8 +62,8 @@ type SuperNode struct { } // NewSuperNodeConfig is used to initialize a SuperNode config from a config .toml file -func NewSuperNodeConfig() (*SuperNode, error) { - sn := new(SuperNode) +func NewSuperNodeConfig() (*SuperNodeConfig, error) { + sn := new(SuperNodeConfig) sn.DBConfig = config.Database{ Name: viper.GetString("superNode.database.name"), Hostname: viper.GetString("superNode.database.hostname"), @@ -128,7 +128,7 @@ func NewSuperNodeConfig() (*SuperNode, error) { } // BackFillFields is used to fill in the BackFill fields of the config -func (sn *SuperNode) BackFillFields() error { +func (sn *SuperNodeConfig) BackFillFields() error { sn.BackFill = true _, httpClient, err := getNodeAndClient(sn.Chain, viper.GetString("superNode.backFill.httpPath")) if err != nil { diff --git a/pkg/super_node/shared/intefaces.go b/pkg/super_node/shared/intefaces.go index 80945c97..2974b7b1 100644 --- a/pkg/super_node/shared/intefaces.go +++ b/pkg/super_node/shared/intefaces.go @@ -18,8 +18,6 @@ package shared import ( "math/big" - - "github.com/vulcanize/vulcanizedb/pkg/super_node/config" ) // PayloadStreamer streams chain-specific payloads to the provided channel @@ -88,7 +86,7 @@ type DagPutter interface { type SubscriptionSettings interface { StartingBlock() *big.Int EndingBlock() *big.Int - ChainType() config.ChainType + ChainType() ChainType HistoricalData() bool HistoricalDataOnly() bool } diff --git a/pkg/super_node/eth/mocks/fetcher.go b/pkg/super_node/shared/mocks/fetcher.go similarity index 88% rename from pkg/super_node/eth/mocks/fetcher.go rename to pkg/super_node/shared/mocks/fetcher.go index dd9bc3cb..e9d6cbfa 100644 --- a/pkg/super_node/eth/mocks/fetcher.go +++ b/pkg/super_node/shared/mocks/fetcher.go @@ -23,16 +23,16 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) -// StateDiffFetcher mock for tests -type StateDiffFetcher struct { +// IPLDFetcher mock for tests +type IPLDFetcher struct { PayloadsToReturn map[uint64]shared.RawChainData FetchErrs map[uint64]error CalledAtBlockHeights [][]uint64 CalledTimes int64 } -// FetchStateDiffsAt mock method -func (fetcher *StateDiffFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { +// FetchAt mock method +func (fetcher *IPLDFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { if fetcher.PayloadsToReturn == nil { return nil, errors.New("mock StateDiffFetcher needs to be initialized with payloads to return") } diff --git a/pkg/super_node/eth/mocks/retriever.go b/pkg/super_node/shared/mocks/retriever.go similarity index 100% rename from pkg/super_node/eth/mocks/retriever.go rename to pkg/super_node/shared/mocks/retriever.go diff --git a/pkg/super_node/eth/mocks/streamer.go b/pkg/super_node/shared/mocks/streamer.go similarity index 74% rename from pkg/super_node/eth/mocks/streamer.go rename to pkg/super_node/shared/mocks/streamer.go index a4e49aad..6b9d2774 100644 --- a/pkg/super_node/eth/mocks/streamer.go +++ b/pkg/super_node/shared/mocks/streamer.go @@ -18,20 +18,19 @@ package mocks import ( "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/statediff" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) -// StateDiffStreamer is the underlying struct for the Streamer interface -type StateDiffStreamer struct { +// PayloadStreamer mock struct +type PayloadStreamer struct { PassedPayloadChan chan shared.RawChainData ReturnSub *rpc.ClientSubscription ReturnErr error - StreamPayloads []statediff.Payload + StreamPayloads []shared.RawChainData } -// Stream is the main loop for subscribing to data from the Geth state diff process -func (sds *StateDiffStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) { +// Stream mock method +func (sds *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) { sds.PassedPayloadChan = payloadChan go func() {