btc mocks; reorg

This commit is contained in:
Ian Norden 2020-02-03 12:22:29 -06:00
parent 808f1b5662
commit 48f70d4ddf
29 changed files with 452 additions and 188 deletions

View File

@ -32,7 +32,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
)
@ -55,7 +54,7 @@ func init() {
func streamEthSubscription() {
// Prep the subscription config/filters to be sent to the server
ethSubConfig, err := config.NewEthSubscriptionConfig()
ethSubConfig, err := eth.NewEthSubscriptionConfig()
if err != nil {
log.Fatal(err)
}

View File

@ -18,14 +18,12 @@ package cmd
import (
"sync"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// superNodeCmd represents the superNode command
@ -85,8 +83,8 @@ func superNode() {
wg.Wait()
}
func newSuperNode() (super_node.SuperNode, *config.SuperNode, error) {
superNodeConfig, err := config.NewSuperNodeConfig()
func newSuperNode() (super_node.SuperNode, *shared.SuperNodeConfig, error) {
superNodeConfig, err := shared.NewSuperNodeConfig()
if err != nil {
return nil, nil, err
}
@ -97,7 +95,7 @@ func newSuperNode() (super_node.SuperNode, *config.SuperNode, error) {
return sn, superNodeConfig, nil
}
func startServers(superNode super_node.SuperNode, settings *config.SuperNode) error {
func startServers(superNode super_node.SuperNode, settings *shared.SuperNodeConfig) error {
_, _, err := rpc.StartIPCEndpoint(settings.IPCEndpoint, superNode.APIs())
if err != nil {
return err

View File

@ -25,7 +25,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
@ -59,7 +58,7 @@ type BackFillService struct {
}
// NewBackFillService returns a new BackFillInterface
func NewBackFillService(settings *config.SuperNode) (BackFillInterface, error) {
func NewBackFillService(settings *shared.SuperNodeConfig) (BackFillInterface, error) {
publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath)
if err != nil {
return nil, err

View File

@ -27,6 +27,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
mocks2 "github.com/vulcanize/vulcanizedb/pkg/super_node/shared/mocks"
)
var _ = Describe("BackFiller", func() {
@ -43,7 +44,7 @@ var _ = Describe("BackFiller", func() {
ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload},
ReturnErr: nil,
}
mockRetriever := &mocks.MockCIDRetriever{
mockRetriever := &mocks2.MockCIDRetriever{
FirstBlockNumberToReturn: 1,
GapsToRetrieve: []shared.Gap{
{
@ -51,7 +52,7 @@ var _ = Describe("BackFiller", func() {
},
},
}
mockFetcher := &mocks.StateDiffFetcher{
mockFetcher := &mocks2.IPLDFetcher{
PayloadsToReturn: map[uint64]shared.RawChainData{
100: mocks.MockStateDiffPayload,
101: mocks.MockStateDiffPayload,
@ -97,7 +98,7 @@ var _ = Describe("BackFiller", func() {
ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload},
ReturnErr: nil,
}
mockRetriever := &mocks.MockCIDRetriever{
mockRetriever := &mocks2.MockCIDRetriever{
FirstBlockNumberToReturn: 1,
GapsToRetrieve: []shared.Gap{
{
@ -105,7 +106,7 @@ var _ = Describe("BackFiller", func() {
},
},
}
mockFetcher := &mocks.StateDiffFetcher{
mockFetcher := &mocks2.IPLDFetcher{
PayloadsToReturn: map[uint64]shared.RawChainData{
100: mocks.MockStateDiffPayload,
},
@ -147,11 +148,11 @@ var _ = Describe("BackFiller", func() {
ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload},
ReturnErr: nil,
}
mockRetriever := &mocks.MockCIDRetriever{
mockRetriever := &mocks2.MockCIDRetriever{
FirstBlockNumberToReturn: 3,
GapsToRetrieve: []shared.Gap{},
}
mockFetcher := &mocks.StateDiffFetcher{
mockFetcher := &mocks2.IPLDFetcher{
PayloadsToReturn: map[uint64]shared.RawChainData{
1: mocks.MockStateDiffPayload,
2: mocks.MockStateDiffPayload,

View File

@ -0,0 +1,64 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/btc"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// PayloadConverter is the underlying struct for the Converter interface
type PayloadConverter struct {
PassedStatediffPayload btc.BlockPayload
ReturnIPLDPayload btc.IPLDPayload
ReturnErr error
}
// Convert method is used to convert a geth statediff.Payload to a IPLDPayload
func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) {
stateDiffPayload, ok := payload.(btc.BlockPayload)
if !ok {
return nil, fmt.Errorf("convert expected payload type %T got %T", btc.BlockPayload{}, payload)
}
pc.PassedStatediffPayload = stateDiffPayload
return pc.ReturnIPLDPayload, pc.ReturnErr
}
// IterativePayloadConverter is the underlying struct for the Converter interface
type IterativePayloadConverter struct {
PassedStatediffPayload []btc.BlockPayload
ReturnIPLDPayload []btc.IPLDPayload
ReturnErr error
iteration int
}
// Convert method is used to convert a geth statediff.Payload to a IPLDPayload
func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) {
stateDiffPayload, ok := payload.(btc.BlockPayload)
if !ok {
return nil, fmt.Errorf("convert expected payload type %T got %T", btc.BlockPayload{}, payload)
}
pc.PassedStatediffPayload = append(pc.PassedStatediffPayload, stateDiffPayload)
if len(pc.PassedStatediffPayload) < pc.iteration+1 {
return nil, fmt.Errorf("IterativePayloadConverter does not have a payload to return at iteration %d", pc.iteration)
}
returnPayload := pc.ReturnIPLDPayload[pc.iteration]
pc.iteration++
return returnPayload, pc.ReturnErr
}

View File

@ -14,4 +14,30 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package config
package mocks
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/btc"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
)
// CIDIndexer is the underlying struct for the Indexer interface
type CIDIndexer struct {
PassedCIDPayload []*btc.CIDPayload
ReturnErr error
}
// Index indexes a cidPayload in Postgres
func (repo *CIDIndexer) Index(cids shared.CIDsForIndexing) error {
cidPayload, ok := cids.(*btc.CIDPayload)
if !ok {
return fmt.Errorf("index expected cids type %T got %T", &eth.CIDPayload{}, cids)
}
repo.PassedCIDPayload = append(repo.PassedCIDPayload, cidPayload)
return repo.ReturnErr
}

View File

@ -0,0 +1,65 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/btc"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// IPLDPublisher is the underlying struct for the Publisher interface
type IPLDPublisher struct {
PassedIPLDPayload btc.IPLDPayload
ReturnCIDPayload *btc.CIDPayload
ReturnErr error
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(btc.IPLDPayload)
if !ok {
return nil, fmt.Errorf("publish expected payload type %T got %T", &btc.IPLDPayload{}, payload)
}
pub.PassedIPLDPayload = ipldPayload
return pub.ReturnCIDPayload, pub.ReturnErr
}
// IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing
type IterativeIPLDPublisher struct {
PassedIPLDPayload []btc.IPLDPayload
ReturnCIDPayload []*btc.CIDPayload
ReturnErr error
iteration int
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IterativeIPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(btc.IPLDPayload)
if !ok {
return nil, fmt.Errorf("publish expected payload type %T got %T", &btc.IPLDPayload{}, payload)
}
pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, ipldPayload)
if len(pub.ReturnCIDPayload) < pub.iteration+1 {
return nil, fmt.Errorf("IterativeIPLDPublisher does not have a payload to return at iteration %d", pub.iteration)
}
returnPayload := pub.ReturnCIDPayload[pub.iteration]
pub.iteration++
return returnPayload, pub.ReturnErr
}

View File

@ -0,0 +1,118 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc
import (
"math/big"
"github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
/*
// HeaderModel is the db model for btc.header_cids table
// TxInput is the db model for btc.tx_inputs table
type TxInput struct {
ID int64 `db:"id"`
TxID int64 `db:"tx_id"`
Index int64 `db:"index"`
TxWitness [][]byte `db:"tx_witness"`
SignatureScript []byte `db:"sig_script"`
PreviousOutPointHash string `db:"outpoint_hash"`
PreviousOutPointIndex uint32 `db:"outpoint_index"`
}
// TxOutput is the db model for btc.tx_outputs table
type TxOutput struct {
ID int64 `db:"id"`
TxID int64 `db:"tx_id"`
Index int64 `db:"index"`
Value int64 `db:"value"`
PkScript []byte `db:"pk_script"`
}
*/
// SubscriptionSettings config is used by a subscriber to specify what bitcoin data to stream from the super node
type SubscriptionSettings struct {
BackFill bool
BackFillOnly bool
Start *big.Int
End *big.Int // set to 0 or a negative value to have no ending block
HeaderFilter HeaderFilter
TxFilter TxFilter
}
// HeaderFilter contains filter settings for headers
type HeaderFilter struct {
Off bool
}
// TxFilter contains filter settings for txs
type TxFilter struct {
Off bool
Index int64 // allow filtering by index so that we can filter for only coinbase transactions (index 0) if we want to
HasWitness bool
WitnessHashes []string
}
// Init is used to initialize a EthSubscription struct with env variables
func NewEthSubscriptionConfig() (*SubscriptionSettings, error) {
sc := new(SubscriptionSettings)
// Below default to false, which means we do not backfill by default
sc.BackFill = viper.GetBool("superNode.ethSubscription.historicalData")
sc.BackFillOnly = viper.GetBool("superNode.ethSubscription.historicalDataOnly")
// Below default to 0
// 0 start means we start at the beginning and 0 end means we continue indefinitely
sc.Start = big.NewInt(viper.GetInt64("superNode.ethSubscription.startingBlock"))
sc.End = big.NewInt(viper.GetInt64("superNode.ethSubscription.endingBlock"))
// Below default to false, which means we get all headers and no uncles by default
sc.HeaderFilter = HeaderFilter{
Off: viper.GetBool("superNode.ethSubscription.off"),
}
// Below defaults to false and two slices of length 0
// Which means we get all transactions by default
sc.TxFilter = TxFilter{
Off: viper.GetBool("superNode.ethSubscription.txFilter.off"),
}
return sc, nil
}
// StartingBlock satisfies the SubscriptionSettings() interface
func (sc *SubscriptionSettings) StartingBlock() *big.Int {
return sc.Start
}
// EndingBlock satisfies the SubscriptionSettings() interface
func (sc *SubscriptionSettings) EndingBlock() *big.Int {
return sc.End
}
// HistoricalData satisfies the SubscriptionSettings() interface
func (sc *SubscriptionSettings) HistoricalData() bool {
return sc.BackFill
}
// HistoricalDataOnly satisfies the SubscriptionSettings() interface
func (sc *SubscriptionSettings) HistoricalDataOnly() bool {
return sc.BackFillOnly
}
// ChainType satisfies the SubscriptionSettings() interface
func (sc *SubscriptionSettings) ChainType() shared.ChainType {
return shared.Bitcoin
}

View File

@ -20,23 +20,20 @@ import (
"fmt"
"github.com/btcsuite/btcd/rpcclient"
"github.com/vulcanize/vulcanizedb/pkg/super_node/btc"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
"github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/vulcanize/vulcanizedb/pkg/super_node/btc"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// NewResponseFilterer constructs a ResponseFilterer for the provided chain type
func NewResponseFilterer(chain config.ChainType) (shared.ResponseFilterer, error) {
func NewResponseFilterer(chain shared.ChainType) (shared.ResponseFilterer, error) {
switch chain {
case config.Ethereum:
case shared.Ethereum:
return eth.NewResponseFilterer(), nil
default:
return nil, fmt.Errorf("invalid chain %T for filterer constructor", chain)
@ -44,11 +41,11 @@ func NewResponseFilterer(chain config.ChainType) (shared.ResponseFilterer, error
}
// NewCIDIndexer constructs a CIDIndexer for the provided chain type
func NewCIDIndexer(chain config.ChainType, db *postgres.DB) (shared.CIDIndexer, error) {
func NewCIDIndexer(chain shared.ChainType, db *postgres.DB) (shared.CIDIndexer, error) {
switch chain {
case config.Ethereum:
case shared.Ethereum:
return eth.NewCIDIndexer(db), nil
case config.Bitcoin:
case shared.Bitcoin:
return btc.NewCIDIndexer(db), nil
default:
return nil, fmt.Errorf("invalid chain %T for indexer constructor", chain)
@ -56,9 +53,9 @@ func NewCIDIndexer(chain config.ChainType, db *postgres.DB) (shared.CIDIndexer,
}
// NewCIDRetriever constructs a CIDRetriever for the provided chain type
func NewCIDRetriever(chain config.ChainType, db *postgres.DB) (shared.CIDRetriever, error) {
func NewCIDRetriever(chain shared.ChainType, db *postgres.DB) (shared.CIDRetriever, error) {
switch chain {
case config.Ethereum:
case shared.Ethereum:
return eth.NewCIDRetriever(db), nil
default:
return nil, fmt.Errorf("invalid chain %T for retriever constructor", chain)
@ -66,9 +63,9 @@ func NewCIDRetriever(chain config.ChainType, db *postgres.DB) (shared.CIDRetriev
}
// NewPayloadStreamer constructs a PayloadStreamer for the provided chain type
func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error) {
func NewPayloadStreamer(chain shared.ChainType, client interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error) {
switch chain {
case config.Ethereum:
case shared.Ethereum:
ethClient, ok := client.(core.RPCClient)
if !ok {
var expectedClientType core.RPCClient
@ -76,10 +73,10 @@ func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.Payl
}
streamChan := make(chan shared.RawChainData, eth.PayloadChanBufferSize)
return eth.NewPayloadStreamer(ethClient), streamChan, nil
case config.Bitcoin:
case shared.Bitcoin:
btcClientConn, ok := client.(*rpcclient.ConnConfig)
if !ok {
return nil, nil, fmt.Errorf("bitcoin payload streamer constructor expected client config type %T got %T", rpcclient.ConnConfig{}, client)
return nil, nil, fmt.Errorf("bitcoin payload streamer constructor expected client shared type %T got %T", rpcclient.ConnConfig{}, client)
}
streamChan := make(chan shared.RawChainData, btc.PayloadChanBufferSize)
return btc.NewPayloadStreamer(btcClientConn), streamChan, nil
@ -89,9 +86,9 @@ func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.Payl
}
// NewPaylaodFetcher constructs a PayloadFetcher for the provided chain type
func NewPaylaodFetcher(chain config.ChainType, client interface{}) (shared.PayloadFetcher, error) {
func NewPaylaodFetcher(chain shared.ChainType, client interface{}) (shared.PayloadFetcher, error) {
switch chain {
case config.Ethereum:
case shared.Ethereum:
batchClient, ok := client.(eth.BatchClient)
if !ok {
var expectedClient eth.BatchClient
@ -104,11 +101,11 @@ func NewPaylaodFetcher(chain config.ChainType, client interface{}) (shared.Paylo
}
// NewPayloadConverter constructs a PayloadConverter for the provided chain type
func NewPayloadConverter(chain config.ChainType) (shared.PayloadConverter, error) {
func NewPayloadConverter(chain shared.ChainType) (shared.PayloadConverter, error) {
switch chain {
case config.Ethereum:
case shared.Ethereum:
return eth.NewPayloadConverter(params.MainnetChainConfig), nil
case config.Bitcoin:
case shared.Bitcoin:
return btc.NewPayloadConverter(), nil
default:
return nil, fmt.Errorf("invalid chain %T for converter constructor", chain)
@ -116,9 +113,9 @@ func NewPayloadConverter(chain config.ChainType) (shared.PayloadConverter, error
}
// NewIPLDFetcher constructs an IPLDFetcher for the provided chain type
func NewIPLDFetcher(chain config.ChainType, ipfsPath string) (shared.IPLDFetcher, error) {
func NewIPLDFetcher(chain shared.ChainType, ipfsPath string) (shared.IPLDFetcher, error) {
switch chain {
case config.Ethereum:
case shared.Ethereum:
return eth.NewIPLDFetcher(ipfsPath)
default:
return nil, fmt.Errorf("invalid chain %T for fetcher constructor", chain)
@ -126,11 +123,11 @@ func NewIPLDFetcher(chain config.ChainType, ipfsPath string) (shared.IPLDFetcher
}
// NewIPLDPublisher constructs an IPLDPublisher for the provided chain type
func NewIPLDPublisher(chain config.ChainType, ipfsPath string) (shared.IPLDPublisher, error) {
func NewIPLDPublisher(chain shared.ChainType, ipfsPath string) (shared.IPLDPublisher, error) {
switch chain {
case config.Ethereum:
case shared.Ethereum:
return eth.NewIPLDPublisher(ipfsPath)
case config.Bitcoin:
case shared.Bitcoin:
return btc.NewIPLDPublisher(ipfsPath)
default:
return nil, fmt.Errorf("invalid chain %T for publisher constructor", chain)
@ -138,9 +135,9 @@ func NewIPLDPublisher(chain config.ChainType, ipfsPath string) (shared.IPLDPubli
}
// NewIPLDResolver constructs an IPLDResolver for the provided chain type
func NewIPLDResolver(chain config.ChainType) (shared.IPLDResolver, error) {
func NewIPLDResolver(chain shared.ChainType) (shared.IPLDResolver, error) {
switch chain {
case config.Ethereum:
case shared.Ethereum:
return eth.NewIPLDResolver(), nil
default:
return nil, fmt.Errorf("invalid chain %T for resolver constructor", chain)
@ -148,9 +145,9 @@ func NewIPLDResolver(chain config.ChainType) (shared.IPLDResolver, error) {
}
// NewPublicAPI constructs a PublicAPI for the provided chain type
func NewPublicAPI(chain config.ChainType, db *postgres.DB, ipfsPath string) (rpc.API, error) {
func NewPublicAPI(chain shared.ChainType, db *postgres.DB, ipfsPath string) (rpc.API, error) {
switch chain {
case config.Ethereum:
case shared.Ethereum:
backend, err := eth.NewEthBackend(db, ipfsPath)
if err != nil {
return rpc.API{}, err

View File

@ -27,8 +27,6 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ipfs/go-block-format"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
)
// APIName is the namespace for the super node's eth api
@ -72,7 +70,7 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery)
topicStrSets[i] = append(topicStrSets[i], topic.String())
}
}
filter := config.ReceiptFilter{
filter := ReceiptFilter{
Contracts: addrStrs,
Topics: topicStrSets,
}

View File

@ -29,7 +29,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
)
var (
@ -123,7 +122,7 @@ func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log
if err != nil {
return nil, err
}
receiptCIDs, err := b.retriever.RetrieveRctCIDs(tx, config.ReceiptFilter{}, 0, &hash, nil)
receiptCIDs, err := b.retriever.RetrieveRctCIDs(tx, ReceiptFilter{}, 0, &hash, nil)
if err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)

View File

@ -20,14 +20,12 @@ import (
"bytes"
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// ResponseFilterer satisfies the ResponseFilterer interface for ethereum
@ -40,9 +38,9 @@ func NewResponseFilterer() *ResponseFilterer {
// Filter is used to filter through eth data to extract and package requested data into a Payload
func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.StreamedIPLDs) (shared.ServerResponse, error) {
ethFilters, ok := filter.(*config.EthSubscription)
ethFilters, ok := filter.(*SubscriptionSettings)
if !ok {
return StreamResponse{}, fmt.Errorf("eth filterer expected filter type %T got %T", &config.EthSubscription{}, filter)
return StreamResponse{}, fmt.Errorf("eth filterer expected filter type %T got %T", &SubscriptionSettings{}, filter)
}
ethPayload, ok := payload.(IPLDPayload)
if !ok {
@ -76,7 +74,7 @@ func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload sh
return StreamResponse{}, nil
}
func (s *ResponseFilterer) filterHeaders(headerFilter config.HeaderFilter, response *StreamResponse, payload IPLDPayload) error {
func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *StreamResponse, payload IPLDPayload) error {
if !headerFilter.Off {
headerRLP, err := rlp.EncodeToBytes(payload.Block.Header())
if err != nil {
@ -104,7 +102,7 @@ func checkRange(start, end, actual int64) bool {
return false
}
func (s *ResponseFilterer) filterTransactions(trxFilter config.TxFilter, response *StreamResponse, payload IPLDPayload) ([]common.Hash, error) {
func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *StreamResponse, payload IPLDPayload) ([]common.Hash, error) {
trxHashes := make([]common.Hash, 0, len(payload.Block.Body().Transactions))
if !trxFilter.Off {
for i, trx := range payload.Block.Body().Transactions {
@ -139,7 +137,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin
return false
}
func (s *ResponseFilterer) filerReceipts(receiptFilter config.ReceiptFilter, response *StreamResponse, payload IPLDPayload, trxHashes []common.Hash) error {
func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *StreamResponse, payload IPLDPayload, trxHashes []common.Hash) error {
if !receiptFilter.Off {
for i, receipt := range payload.Receipts {
// topics is always length 4
@ -223,7 +221,7 @@ func slicesShareString(slice1, slice2 []string) int {
return 0
}
func (s *ResponseFilterer) filterState(stateFilter config.StateFilter, response *StreamResponse, payload IPLDPayload) error {
func (s *ResponseFilterer) filterState(stateFilter StateFilter, response *StreamResponse, payload IPLDPayload) error {
if !stateFilter.Off {
response.StateNodesRlp = make(map[common.Hash][]byte)
keyFilters := make([]common.Hash, len(stateFilter.Addresses))
@ -254,7 +252,7 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool {
return false
}
func (s *ResponseFilterer) filterStorage(storageFilter config.StorageFilter, response *StreamResponse, payload IPLDPayload) error {
func (s *ResponseFilterer) filterStorage(storageFilter StorageFilter, response *StreamResponse, payload IPLDPayload) error {
if !storageFilter.Off {
response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte)
stateKeyFilters := make([]common.Hash, len(storageFilter.Addresses))

View File

@ -23,7 +23,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres"
@ -119,7 +118,7 @@ func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPa
func (in *CIDIndexer) indexReceiptCID(tx *sqlx.Tx, cidMeta ReceiptModel, txID int64) error {
_, err := tx.Exec(`INSERT INTO eth.receipt_cids (tx_id, cid, contract, topic0s, topic1s, topic2s, topic3s) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
txID, cidMeta.CID, cidMeta.Contract, pq.Array(cidMeta.Topic0s), pq.Array(cidMeta.Topic1s), pq.Array(cidMeta.Topic2s), pq.Array(cidMeta.Topic3s))
txID, cidMeta.CID, cidMeta.Contract, cidMeta.Topic0s, cidMeta.Topic1s, cidMeta.Topic2s, cidMeta.Topic3s)
return err
}

View File

@ -24,8 +24,8 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks"
)
var (

View File

@ -18,6 +18,7 @@ package eth
import "github.com/lib/pq"
// HeaderModel is the db model for eth.header_cids
type HeaderModel struct {
ID int64 `db:"id"`
BlockNumber string `db:"block_number"`
@ -27,6 +28,7 @@ type HeaderModel struct {
TotalDifficulty string `db:"td"`
}
// UncleModel is the db model for eth.uncle_cids
type UncleModel struct {
ID int64 `db:"id"`
HeaderID int64 `db:"header_id"`
@ -35,6 +37,7 @@ type UncleModel struct {
CID string `db:"cid"`
}
// TxModel is the db model for eth.transaction_cids
type TxModel struct {
ID int64 `db:"id"`
HeaderID int64 `db:"header_id"`
@ -45,6 +48,7 @@ type TxModel struct {
Src string `db:"src"`
}
// ReceiptModel is the db model for eth.receipt_cids
type ReceiptModel struct {
ID int64 `db:"id"`
TxID int64 `db:"tx_id"`
@ -56,6 +60,7 @@ type ReceiptModel struct {
Topic3s pq.StringArray `db:"topic3s"`
}
// StateNodeModel is the db model for eth.state_cids
type StateNodeModel struct {
ID int64 `db:"id"`
HeaderID int64 `db:"header_id"`
@ -64,6 +69,7 @@ type StateNodeModel struct {
CID string `db:"cid"`
}
// StorageNodeModel is the db model for eth.storage_cids
type StorageNodeModel struct {
ID int64 `db:"id"`
StateID int64 `db:"state_id"`
@ -72,6 +78,7 @@ type StorageNodeModel struct {
CID string `db:"cid"`
}
// StorageNodeWithStateKeyModel is a db model for eth.storage_cids + eth.state_cids.state_key
type StorageNodeWithStateKeyModel struct {
ID int64 `db:"id"`
StateID int64 `db:"state_id"`

View File

@ -21,25 +21,26 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
mocks2 "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks"
)
var (
mockHeaderDagPutter *mocks.DagPutter
mockTrxDagPutter *mocks.DagPutter
mockRctDagPutter *mocks.DagPutter
mockStateDagPutter *mocks.MappedDagPutter
mockStorageDagPutter *mocks.DagPutter
mockHeaderDagPutter *mocks2.DagPutter
mockTrxDagPutter *mocks2.DagPutter
mockRctDagPutter *mocks2.DagPutter
mockStateDagPutter *mocks2.MappedDagPutter
mockStorageDagPutter *mocks2.DagPutter
)
var _ = Describe("Publisher", func() {
BeforeEach(func() {
mockHeaderDagPutter = new(mocks.DagPutter)
mockTrxDagPutter = new(mocks.DagPutter)
mockRctDagPutter = new(mocks.DagPutter)
mockStateDagPutter = new(mocks.MappedDagPutter)
mockStorageDagPutter = new(mocks.DagPutter)
mockHeaderDagPutter = new(mocks2.DagPutter)
mockTrxDagPutter = new(mocks2.DagPutter)
mockRctDagPutter = new(mocks2.DagPutter)
mockStateDagPutter = new(mocks2.MappedDagPutter)
mockStorageDagPutter = new(mocks2.DagPutter)
})
Describe("Publish", func() {

View File

@ -27,7 +27,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
@ -59,9 +58,9 @@ func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) {
// Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters
func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) {
streamFilter, ok := filter.(*config.EthSubscription)
streamFilter, ok := filter.(*SubscriptionSettings)
if !ok {
return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &config.EthSubscription{}, filter)
return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &SubscriptionSettings{}, filter)
}
log.Debug("retrieving cids")
tx, err := ecr.db.Beginx()
@ -173,7 +172,7 @@ func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64
// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
// also returns the ids for the returned transaction cids
func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter config.TxFilter, blockNumber int64) ([]TxModel, error) {
func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNumber int64) ([]TxModel, error) {
log.Debug("retrieving transaction cids for block ", blockNumber)
args := make([]interface{}, 0, 3)
results := make([]TxModel, 0)
@ -196,7 +195,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter config.TxFilter, b
// RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight that conform to the provided
// filter parameters and correspond to the provided tx ids
func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter config.ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) {
func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) {
log.Debug("retrieving receipt cids for block ", blockNumber)
id := 1
args := make([]interface{}, 0, 4)
@ -282,7 +281,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter config.ReceiptFi
}
// RetrieveStateCIDs retrieves and returns all of the state node cids at the provided blockheight that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter config.StateFilter, blockNumber int64) ([]StateNodeModel, error) {
func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, blockNumber int64) ([]StateNodeModel, error) {
log.Debug("retrieving state cids for block ", blockNumber)
args := make([]interface{}, 0, 2)
pgStr := `SELECT state_cids.id, state_cids.header_id,
@ -307,7 +306,7 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter config.State
}
// RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided blockheight that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter config.StorageFilter, blockNumber int64) ([]StorageNodeWithStateKeyModel, error) {
func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, blockNumber int64) ([]StorageNodeWithStateKeyModel, error) {
log.Debug("retrieving storage cids for block ", blockNumber)
args := make([]interface{}, 0, 3)
pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_key,
@ -472,7 +471,7 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.H
func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) {
log.Debug("retrieving tx cids for block id ", headerID)
pgStr := `SELECT * FROM eth.transaction_cids
WHERE eth.transaction_cids.header_id = $1`
WHERE header_id = $1`
var txCIDs []TxModel
return txCIDs, tx.Select(&txCIDs, pgStr, headerID)
}
@ -481,7 +480,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) (
func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]ReceiptModel, error) {
log.Debugf("retrieving receipt cids for tx ids %v", txIDs)
pgStr := `SELECT * FROM eth.receipt_cids
WHERE eth.receipt_cids.tx_id = ANY($1::INTEGER[])`
WHERE tx_id = ANY($1::INTEGER[])`
var rctCIDs []ReceiptModel
return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs))
}

View File

@ -23,7 +23,6 @@ import (
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
eth2 "github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks"
@ -31,175 +30,175 @@ import (
)
var (
openFilter = &config.EthSubscription{
openFilter = &eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: config.HeaderFilter{},
TxFilter: config.TxFilter{},
ReceiptFilter: config.ReceiptFilter{},
StateFilter: config.StateFilter{},
StorageFilter: config.StorageFilter{},
HeaderFilter: eth.HeaderFilter{},
TxFilter: eth.TxFilter{},
ReceiptFilter: eth.ReceiptFilter{},
StateFilter: eth.StateFilter{},
StorageFilter: eth.StorageFilter{},
}
rctContractFilter = &config.EthSubscription{
rctContractFilter = &eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: config.HeaderFilter{
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: config.TxFilter{
TxFilter: eth.TxFilter{
Off: true,
},
ReceiptFilter: config.ReceiptFilter{
ReceiptFilter: eth.ReceiptFilter{
Contracts: []string{mocks.AnotherAddress.String()},
},
StateFilter: config.StateFilter{
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: config.StorageFilter{
StorageFilter: eth.StorageFilter{
Off: true,
},
}
rctTopicsFilter = &config.EthSubscription{
rctTopicsFilter = &eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: config.HeaderFilter{
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: config.TxFilter{
TxFilter: eth.TxFilter{
Off: true,
},
ReceiptFilter: config.ReceiptFilter{
ReceiptFilter: eth.ReceiptFilter{
Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000004"}},
},
StateFilter: config.StateFilter{
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: config.StorageFilter{
StorageFilter: eth.StorageFilter{
Off: true,
},
}
rctTopicsAndContractFilter = &config.EthSubscription{
rctTopicsAndContractFilter = &eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: config.HeaderFilter{
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: config.TxFilter{
TxFilter: eth.TxFilter{
Off: true,
},
ReceiptFilter: config.ReceiptFilter{
ReceiptFilter: eth.ReceiptFilter{
Topics: [][]string{
{"0x0000000000000000000000000000000000000000000000000000000000000004"},
{"0x0000000000000000000000000000000000000000000000000000000000000006"},
},
Contracts: []string{mocks.Address.String()},
},
StateFilter: config.StateFilter{
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: config.StorageFilter{
StorageFilter: eth.StorageFilter{
Off: true,
},
}
rctTopicsAndContractFilterFail = &config.EthSubscription{
rctTopicsAndContractFilterFail = &eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: config.HeaderFilter{
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: config.TxFilter{
TxFilter: eth.TxFilter{
Off: true,
},
ReceiptFilter: config.ReceiptFilter{
ReceiptFilter: eth.ReceiptFilter{
Topics: [][]string{
{"0x0000000000000000000000000000000000000000000000000000000000000004"},
{"0x0000000000000000000000000000000000000000000000000000000000000007"}, // This topic won't match on the mocks.Address.String() contract receipt
},
Contracts: []string{mocks.Address.String()},
},
StateFilter: config.StateFilter{
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: config.StorageFilter{
StorageFilter: eth.StorageFilter{
Off: true,
},
}
rctContractsAndTopicFilter = &config.EthSubscription{
rctContractsAndTopicFilter = &eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: config.HeaderFilter{
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: config.TxFilter{
TxFilter: eth.TxFilter{
Off: true,
},
ReceiptFilter: config.ReceiptFilter{
ReceiptFilter: eth.ReceiptFilter{
Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000005"}},
Contracts: []string{mocks.Address.String(), mocks.AnotherAddress.String()},
},
StateFilter: config.StateFilter{
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: config.StorageFilter{
StorageFilter: eth.StorageFilter{
Off: true,
},
}
rctsForAllCollectedTrxs = &config.EthSubscription{
rctsForAllCollectedTrxs = &eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: config.HeaderFilter{
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: config.TxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter
ReceiptFilter: config.ReceiptFilter{
TxFilter: eth.TxFilter{}, // Trx filter open so we will collect all trxs, therefore we will also collect all corresponding rcts despite rct filter
ReceiptFilter: eth.ReceiptFilter{
MatchTxs: true,
Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have
Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have
},
StateFilter: config.StateFilter{
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: config.StorageFilter{
StorageFilter: eth.StorageFilter{
Off: true,
},
}
rctsForSelectCollectedTrxs = &config.EthSubscription{
rctsForSelectCollectedTrxs = &eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: config.HeaderFilter{
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: config.TxFilter{
TxFilter: eth.TxFilter{
Dst: []string{mocks.AnotherAddress.String()}, // We only filter for one of the trxs so we will only get the one corresponding receipt
},
ReceiptFilter: config.ReceiptFilter{
ReceiptFilter: eth.ReceiptFilter{
MatchTxs: true,
Topics: [][]string{{"0x0000000000000000000000000000000000000000000000000000000000000006"}}, // Topic0 isn't one of the topic0s we have
Contracts: []string{"0x0000000000000000000000000000000000000002"}, // Contract isn't one of the contracts we have
},
StateFilter: config.StateFilter{
StateFilter: eth.StateFilter{
Off: true,
},
StorageFilter: config.StorageFilter{
StorageFilter: eth.StorageFilter{
Off: true,
},
}
stateFilter = &config.EthSubscription{
stateFilter = &eth.SubscriptionSettings{
Start: big.NewInt(0),
End: big.NewInt(1),
HeaderFilter: config.HeaderFilter{
HeaderFilter: eth.HeaderFilter{
Off: true,
},
TxFilter: config.TxFilter{
TxFilter: eth.TxFilter{
Off: true,
},
ReceiptFilter: config.ReceiptFilter{
ReceiptFilter: eth.ReceiptFilter{
Off: true,
},
StateFilter: config.StateFilter{
StateFilter: eth.StateFilter{
Addresses: []string{mocks.Address.Hex()},
},
StorageFilter: config.StorageFilter{
StorageFilter: eth.StorageFilter{
Off: true,
},
}

View File

@ -14,17 +14,19 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package config
package eth
import (
"errors"
"math/big"
"github.com/spf13/viper"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// EthSubscription config is used by a subscriber to specify what eth data to stream from the super node
type EthSubscription struct {
// SubscriptionSettings config is used by a subscriber to specify what eth data to stream from the super node
type SubscriptionSettings struct {
BackFill bool
BackFillOnly bool
Start *big.Int
@ -73,8 +75,8 @@ type StorageFilter struct {
}
// Init is used to initialize a EthSubscription struct with env variables
func NewEthSubscriptionConfig() (*EthSubscription, error) {
sc := new(EthSubscription)
func NewEthSubscriptionConfig() (*SubscriptionSettings, error) {
sc := new(SubscriptionSettings)
// Below default to false, which means we do not backfill by default
sc.BackFill = viper.GetBool("superNode.ethSubscription.historicalData")
sc.BackFillOnly = viper.GetBool("superNode.ethSubscription.historicalDataOnly")
@ -126,26 +128,26 @@ func NewEthSubscriptionConfig() (*EthSubscription, error) {
}
// StartingBlock satisfies the SubscriptionSettings() interface
func (sc *EthSubscription) StartingBlock() *big.Int {
func (sc *SubscriptionSettings) StartingBlock() *big.Int {
return sc.Start
}
// EndingBlock satisfies the SubscriptionSettings() interface
func (sc *EthSubscription) EndingBlock() *big.Int {
func (sc *SubscriptionSettings) EndingBlock() *big.Int {
return sc.End
}
// HistoricalData satisfies the SubscriptionSettings() interface
func (sc *EthSubscription) HistoricalData() bool {
func (sc *SubscriptionSettings) HistoricalData() bool {
return sc.BackFill
}
// HistoricalDataOnly satisfies the SubscriptionSettings() interface
func (sc *EthSubscription) HistoricalDataOnly() bool {
func (sc *SubscriptionSettings) HistoricalDataOnly() bool {
return sc.BackFillOnly
}
// ChainType satisfies the SubscriptionSettings() interface
func (sc *EthSubscription) ChainType() ChainType {
return Ethereum
func (sc *SubscriptionSettings) ChainType() shared.ChainType {
return shared.Ethereum
}

View File

@ -31,7 +31,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
"github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
@ -90,7 +89,7 @@ type Service struct {
// Number of publishAndIndex workers
WorkerPoolSize int
// chain type for this service
chain config.ChainType
chain shared.ChainType
// Path to ipfs data dir
ipfsPath string
// Underlying db
@ -98,7 +97,7 @@ type Service struct {
}
// NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct
func NewSuperNode(settings *config.SuperNode) (SuperNode, error) {
func NewSuperNode(settings *shared.SuperNodeConfig) (SuperNode, error) {
if err := ipfs.InitIPFSPlugins(); err != nil {
return nil, err
}

View File

@ -20,15 +20,14 @@ import (
"sync"
"time"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
mocks2 "github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth/mocks"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
mocks2 "github.com/vulcanize/vulcanizedb/pkg/super_node/shared/mocks"
)
var _ = Describe("Service", func() {
@ -37,22 +36,22 @@ var _ = Describe("Service", func() {
wg := new(sync.WaitGroup)
payloadChan := make(chan shared.RawChainData, 1)
quitChan := make(chan bool, 1)
mockCidIndexer := &mocks2.CIDIndexer{
mockCidIndexer := &mocks.CIDIndexer{
ReturnErr: nil,
}
mockPublisher := &mocks2.IPLDPublisher{
ReturnCIDPayload: mocks2.MockCIDPayload,
mockPublisher := &mocks.IPLDPublisher{
ReturnCIDPayload: mocks.MockCIDPayload,
ReturnErr: nil,
}
mockStreamer := &mocks2.StateDiffStreamer{
mockStreamer := &mocks2.PayloadStreamer{
ReturnSub: &rpc.ClientSubscription{},
StreamPayloads: []statediff.Payload{
mocks2.MockStateDiffPayload,
StreamPayloads: []shared.RawChainData{
mocks.MockStateDiffPayload,
},
ReturnErr: nil,
}
mockConverter := &mocks2.PayloadConverter{
ReturnIPLDPayload: mocks2.MockIPLDPayload,
mockConverter := &mocks.PayloadConverter{
ReturnIPLDPayload: mocks.MockIPLDPayload,
ReturnErr: nil,
}
processor := &super_node.Service{
@ -69,10 +68,10 @@ var _ = Describe("Service", func() {
time.Sleep(2 * time.Second)
quitChan <- true
wg.Wait()
Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks2.MockStateDiffPayload))
Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload))
Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1))
Expect(mockCidIndexer.PassedCIDPayload[0]).To(Equal(mocks2.MockCIDPayload))
Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks2.MockIPLDPayload))
Expect(mockCidIndexer.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload))
Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockIPLDPayload))
Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan))
})
})

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package config
package shared
import (
"errors"

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package config
package shared
import (
"fmt"
@ -36,8 +36,8 @@ import (
"github.com/vulcanize/vulcanizedb/utils"
)
// SuperNode config struct
type SuperNode struct {
// SuperNodeConfig struct
type SuperNodeConfig struct {
// Ubiquitous fields
Chain ChainType
IPFSPath string
@ -62,8 +62,8 @@ type SuperNode struct {
}
// NewSuperNodeConfig is used to initialize a SuperNode config from a config .toml file
func NewSuperNodeConfig() (*SuperNode, error) {
sn := new(SuperNode)
func NewSuperNodeConfig() (*SuperNodeConfig, error) {
sn := new(SuperNodeConfig)
sn.DBConfig = config.Database{
Name: viper.GetString("superNode.database.name"),
Hostname: viper.GetString("superNode.database.hostname"),
@ -128,7 +128,7 @@ func NewSuperNodeConfig() (*SuperNode, error) {
}
// BackFillFields is used to fill in the BackFill fields of the config
func (sn *SuperNode) BackFillFields() error {
func (sn *SuperNodeConfig) BackFillFields() error {
sn.BackFill = true
_, httpClient, err := getNodeAndClient(sn.Chain, viper.GetString("superNode.backFill.httpPath"))
if err != nil {

View File

@ -18,8 +18,6 @@ package shared
import (
"math/big"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
)
// PayloadStreamer streams chain-specific payloads to the provided channel
@ -88,7 +86,7 @@ type DagPutter interface {
type SubscriptionSettings interface {
StartingBlock() *big.Int
EndingBlock() *big.Int
ChainType() config.ChainType
ChainType() ChainType
HistoricalData() bool
HistoricalDataOnly() bool
}

View File

@ -23,16 +23,16 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// StateDiffFetcher mock for tests
type StateDiffFetcher struct {
// IPLDFetcher mock for tests
type IPLDFetcher struct {
PayloadsToReturn map[uint64]shared.RawChainData
FetchErrs map[uint64]error
CalledAtBlockHeights [][]uint64
CalledTimes int64
}
// FetchStateDiffsAt mock method
func (fetcher *StateDiffFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) {
// FetchAt mock method
func (fetcher *IPLDFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) {
if fetcher.PayloadsToReturn == nil {
return nil, errors.New("mock StateDiffFetcher needs to be initialized with payloads to return")
}

View File

@ -18,20 +18,19 @@ package mocks
import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// StateDiffStreamer is the underlying struct for the Streamer interface
type StateDiffStreamer struct {
// PayloadStreamer mock struct
type PayloadStreamer struct {
PassedPayloadChan chan shared.RawChainData
ReturnSub *rpc.ClientSubscription
ReturnErr error
StreamPayloads []statediff.Payload
StreamPayloads []shared.RawChainData
}
// Stream is the main loop for subscribing to data from the Geth state diff process
func (sds *StateDiffStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) {
// Stream mock method
func (sds *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) {
sds.PassedPayloadChan = payloadChan
go func() {