wrap generic top-level interfaces with loose types

This commit is contained in:
Ian Norden 2020-01-31 12:03:37 -06:00
parent cacd9953ee
commit 4dde90447e
40 changed files with 230 additions and 161 deletions

View File

@ -65,7 +65,7 @@ func streamEthSubscription() {
str := streamer.NewSuperNodeStreamer(rpcClient)
// Buffered channel for reading subscription payloads
payloadChan := make(chan super_node.Payload, 20000)
payloadChan := make(chan super_node.SubscriptionPayload, 20000)
// Subscribe to the super node service with the given config/filter parameters
sub, err := str.Stream(payloadChan, ethSubConfig)
@ -81,9 +81,9 @@ func streamEthSubscription() {
logWithCommand.Error(payload.Err)
continue
}
data, ok := payload.Data.(eth.StreamPayload)
data, ok := payload.Data.(eth.StreamResponse)
if !ok {
logWithCommand.Warnf("payload data expected type %T got %T", eth.StreamPayload{}, payload.Data)
logWithCommand.Warnf("payload data expected type %T got %T", eth.StreamResponse{}, payload.Data)
continue
}
for _, headerRlp := range data.HeadersRlp {

View File

@ -18,6 +18,8 @@ package cmd
import (
"sync"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -59,10 +61,10 @@ func superNode() {
}
wg := &sync.WaitGroup{}
var forwardQuitChan chan bool
var forwardPayloadChan chan interface{}
var forwardPayloadChan chan shared.StreamedIPLDs
if superNodeConfig.Serve {
forwardQuitChan = make(chan bool)
forwardPayloadChan = make(chan interface{}, super_node.PayloadChanBufferSize)
forwardPayloadChan = make(chan shared.StreamedIPLDs, super_node.PayloadChanBufferSize)
superNode.ScreenAndServe(wg, forwardPayloadChan, forwardQuitChan)
if err := startServers(superNode, superNodeConfig); err != nil {
logWithCommand.Fatal(err)

View File

@ -19,14 +19,15 @@ package streamer
import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// ISuperNodeStreamer is the interface for streaming SuperNodePayloads from a vulcanizeDB super node
type ISuperNodeStreamer interface {
Stream(payloadChan chan super_node.Payload, params super_node.SubscriptionSettings) (*rpc.ClientSubscription, error)
Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error)
}
// SuperNodeStreamer is the underlying struct for the ISuperNodeStreamer interface
@ -42,6 +43,6 @@ func NewSuperNodeStreamer(client core.RPCClient) *SuperNodeStreamer {
}
// Stream is the main loop for subscribing to data from a vulcanizedb super node
func (sds *SuperNodeStreamer) Stream(payloadChan chan super_node.Payload, params super_node.SubscriptionSettings) (*rpc.ClientSubscription, error) {
func (sds *SuperNodeStreamer) Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error) {
return sds.Client.Subscribe("vdb", payloadChan, "stream", params)
}

View File

@ -19,13 +19,13 @@ package transformer
import (
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
"github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
type SuperNodeTransformer interface {
Init() error
Execute() error
GetConfig() super_node.SubscriptionSettings
GetConfig() shared.SubscriptionSettings
}
type SuperNodeTransformerInitializer func(db *postgres.DB, subCon super_node.SubscriptionSettings, client core.RPCClient) SuperNodeTransformer
type SuperNodeTransformerInitializer func(db *postgres.DB, subCon shared.SubscriptionSettings, client core.RPCClient) SuperNodeTransformer

View File

@ -19,6 +19,8 @@ package super_node
import (
"context"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
@ -44,7 +46,7 @@ func NewPublicSuperNodeAPI(superNodeInterface SuperNode) *PublicSuperNodeAPI {
}
// Stream is the public method to setup a subscription that fires off super node payloads as they are processed
func (api *PublicSuperNodeAPI) Stream(ctx context.Context, params SubscriptionSettings) (*rpc.Subscription, error) {
func (api *PublicSuperNodeAPI) Stream(ctx context.Context, params shared.SubscriptionSettings) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
@ -56,7 +58,7 @@ func (api *PublicSuperNodeAPI) Stream(ctx context.Context, params SubscriptionSe
go func() {
// subscribe to events from the SyncPublishScreenAndServe service
payloadChannel := make(chan Payload, PayloadChanBufferSize)
payloadChannel := make(chan SubscriptionPayload, PayloadChanBufferSize)
quitChan := make(chan bool, 1)
go api.sn.Subscribe(rpcSub.ID, payloadChannel, quitChan, params)

View File

@ -22,14 +22,11 @@ import (
"sync/atomic"
"time"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/ethereum/go-ethereum/params"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
const (
@ -71,7 +68,7 @@ func NewBackFillService(settings *config.SuperNode) (BackFillInterface, error) {
if err != nil {
return nil, err
}
converter, err := NewPayloadConverter(settings.Chain, params.MainnetChainConfig)
converter, err := NewPayloadConverter(settings.Chain)
if err != nil {
return nil, err
}

View File

@ -20,7 +20,6 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -41,7 +40,7 @@ var _ = Describe("BackFiller", func() {
ReturnErr: nil,
}
mockConverter := &mocks.IterativePayloadConverter{
ReturnIPLDPayload: []*eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload},
ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload},
ReturnErr: nil,
}
mockRetriever := &mocks.MockCIDRetriever{
@ -53,7 +52,7 @@ var _ = Describe("BackFiller", func() {
},
}
mockFetcher := &mocks.StateDiffFetcher{
PayloadsToReturn: map[uint64]statediff.Payload{
PayloadsToReturn: map[uint64]shared.RawChainData{
100: mocks.MockStateDiffPayload,
101: mocks.MockStateDiffPayload,
},
@ -95,7 +94,7 @@ var _ = Describe("BackFiller", func() {
ReturnErr: nil,
}
mockConverter := &mocks.IterativePayloadConverter{
ReturnIPLDPayload: []*eth.IPLDPayload{mocks.MockIPLDPayload},
ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload},
ReturnErr: nil,
}
mockRetriever := &mocks.MockCIDRetriever{
@ -107,7 +106,7 @@ var _ = Describe("BackFiller", func() {
},
}
mockFetcher := &mocks.StateDiffFetcher{
PayloadsToReturn: map[uint64]statediff.Payload{
PayloadsToReturn: map[uint64]shared.RawChainData{
100: mocks.MockStateDiffPayload,
},
}
@ -145,7 +144,7 @@ var _ = Describe("BackFiller", func() {
ReturnErr: nil,
}
mockConverter := &mocks.IterativePayloadConverter{
ReturnIPLDPayload: []*eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload},
ReturnIPLDPayload: []eth.IPLDPayload{mocks.MockIPLDPayload, mocks.MockIPLDPayload},
ReturnErr: nil,
}
mockRetriever := &mocks.MockCIDRetriever{
@ -153,7 +152,7 @@ var _ = Describe("BackFiller", func() {
GapsToRetrieve: []shared.Gap{},
}
mockFetcher := &mocks.StateDiffFetcher{
PayloadsToReturn: map[uint64]statediff.Payload{
PayloadsToReturn: map[uint64]shared.RawChainData{
1: mocks.MockStateDiffPayload,
2: mocks.MockStateDiffPayload,
},

View File

@ -18,6 +18,8 @@ package btc
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// PayloadConverter satisfies the PayloadConverter interface for bitcoin
@ -30,7 +32,7 @@ func NewPayloadConverter() *PayloadConverter {
// Convert method is used to convert a bitcoin BlockPayload to an IPLDPayload
// Satisfies the shared.PayloadConverter interface
func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) {
func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) {
btcBlockPayload, ok := payload.(BlockPayload)
if !ok {
return nil, fmt.Errorf("btc converter: expected payload type %T got %T", BlockPayload{}, payload)

View File

@ -48,8 +48,8 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) {
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisher) Publish(payload interface{}) (interface{}, error) {
ipldPayload, ok := payload.(*IPLDPayload)
func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(IPLDPayload)
if !ok {
return nil, fmt.Errorf("eth publisher expected payload type %T got %T", &IPLDPayload{}, payload)
}

View File

@ -43,7 +43,7 @@ func NewPayloadStreamer(clientConfig *rpcclient.ConnConfig) *PayloadStreamer {
// Stream is the main loop for subscribing to data from the btc block notifications
// Satisfies the shared.PayloadStreamer interface
func (ps *PayloadStreamer) Stream(payloadChan chan interface{}) (shared.ClientSubscription, error) {
func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) {
logrus.Info("streaming block payloads from btc")
blockNotificationHandler := rpcclient.NotificationHandlers{
// Notification handler for block connections, forwards new block data to the payloadChan

View File

@ -20,6 +20,8 @@ import (
"encoding/json"
"math/big"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/ipfs/go-block-format"
@ -40,6 +42,10 @@ type IPLDPayload struct {
TxMetaData []TxModel
}
func (ip IPLDPayload) Value() shared.StreamedIPLDs {
return ip
}
// CIDPayload is a struct to hold all the CIDs and their associated meta data for indexing in Postgres
// Returned by IPLDPublisher
// Passed to CIDIndexer

View File

@ -64,7 +64,7 @@ func NewCIDRetriever(chain config.ChainType, db *postgres.DB) (shared.CIDRetriev
}
// NewPayloadStreamer constructs a PayloadStreamer for the provided chain type
func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.PayloadStreamer, chan interface{}, error) {
func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.PayloadStreamer, chan shared.RawChainData, error) {
switch chain {
case config.Ethereum:
ethClient, ok := client.(core.RPCClient)
@ -72,14 +72,14 @@ func NewPayloadStreamer(chain config.ChainType, client interface{}) (shared.Payl
var expectedClientType core.RPCClient
return nil, nil, fmt.Errorf("ethereum payload streamer constructor expected client type %T got %T", expectedClientType, client)
}
streamChan := make(chan interface{}, eth.PayloadChanBufferSize)
streamChan := make(chan shared.RawChainData, eth.PayloadChanBufferSize)
return eth.NewPayloadStreamer(ethClient), streamChan, nil
case config.Bitcoin:
btcClientConn, ok := client.(*rpcclient.ConnConfig)
if !ok {
return nil, nil, fmt.Errorf("bitcoin payload streamer constructor expected client config type %T got %T", rpcclient.ConnConfig{}, client)
}
streamChan := make(chan interface{}, btc.PayloadChanBufferSize)
streamChan := make(chan shared.RawChainData, btc.PayloadChanBufferSize)
return btc.NewPayloadStreamer(btcClientConn), streamChan, nil
default:
return nil, nil, fmt.Errorf("invalid chain %T for streamer constructor", chain)
@ -102,14 +102,10 @@ func NewPaylaodFetcher(chain config.ChainType, client interface{}) (shared.Paylo
}
// NewPayloadConverter constructs a PayloadConverter for the provided chain type
func NewPayloadConverter(chain config.ChainType, settings interface{}) (shared.PayloadConverter, error) {
func NewPayloadConverter(chain config.ChainType) (shared.PayloadConverter, error) {
switch chain {
case config.Ethereum:
ethConfig, ok := settings.(*params.ChainConfig)
if !ok {
return nil, fmt.Errorf("ethereum converter constructor expected config type %T got %T", &params.ChainConfig{}, settings)
}
return eth.NewPayloadConverter(ethConfig), nil
return eth.NewPayloadConverter(params.MainnetChainConfig), nil
case config.Bitcoin:
return btc.NewPayloadConverter(), nil
default:

View File

@ -19,6 +19,8 @@ package eth
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
@ -40,7 +42,7 @@ func NewPayloadConverter(chainConfig *params.ChainConfig) *PayloadConverter {
// Convert method is used to convert a eth statediff.Payload to an IPLDPayload
// Satisfies the shared.PayloadConverter interface
func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) {
func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) {
stateDiffPayload, ok := payload.(statediff.Payload)
if !ok {
return nil, fmt.Errorf("eth converter: expected payload type %T got %T", statediff.Payload{}, payload)
@ -51,7 +53,7 @@ func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) {
return nil, err
}
trxLen := len(block.Transactions())
convertedPayload := &IPLDPayload{
convertedPayload := IPLDPayload{
TotalDifficulty: stateDiffPayload.TotalDifficulty,
Block: block,
TxMetaData: make([]TxModel, 0, trxLen),

View File

@ -32,7 +32,7 @@ var _ = Describe("Converter", func() {
converter := eth.NewPayloadConverter(params.MainnetChainConfig)
payload, err := converter.Convert(mocks.MockStateDiffPayload)
Expect(err).ToNot(HaveOccurred())
convertedPayload, ok := payload.(*eth.IPLDPayload)
convertedPayload, ok := payload.(eth.IPLDPayload)
Expect(ok).To(BeTrue())
Expect(convertedPayload.Block.Number().String()).To(Equal(mocks.BlockNumber.String()))
Expect(convertedPayload.Block.Hash().String()).To(Equal(mocks.MockBlock.Hash().String()))

View File

@ -20,6 +20,8 @@ import (
"bytes"
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
@ -37,44 +39,44 @@ func NewResponseFilterer() *ResponseFilterer {
}
// Filter is used to filter through eth data to extract and package requested data into a Payload
func (s *ResponseFilterer) Filter(filter, payload interface{}) (interface{}, error) {
func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.StreamedIPLDs) (shared.ServerResponse, error) {
ethFilters, ok := filter.(*config.EthSubscription)
if !ok {
return StreamPayload{}, fmt.Errorf("eth filterer expected filter type %T got %T", &config.EthSubscription{}, filter)
return StreamResponse{}, fmt.Errorf("eth filterer expected filter type %T got %T", &config.EthSubscription{}, filter)
}
ethPayload, ok := payload.(*IPLDPayload)
ethPayload, ok := payload.(IPLDPayload)
if !ok {
return StreamPayload{}, fmt.Errorf("eth filterer expected payload type %T got %T", &IPLDPayload{}, payload)
return StreamResponse{}, fmt.Errorf("eth filterer expected payload type %T got %T", IPLDPayload{}, payload)
}
if checkRange(ethFilters.Start.Int64(), ethFilters.End.Int64(), ethPayload.Block.Number().Int64()) {
response := new(StreamPayload)
response := new(StreamResponse)
if err := s.filterHeaders(ethFilters.HeaderFilter, response, ethPayload); err != nil {
return StreamPayload{}, err
return StreamResponse{}, err
}
txHashes, err := s.filterTransactions(ethFilters.TxFilter, response, ethPayload)
if err != nil {
return StreamPayload{}, err
return StreamResponse{}, err
}
var filterTxs []common.Hash
if ethFilters.ReceiptFilter.MatchTxs {
filterTxs = txHashes
}
if err := s.filerReceipts(ethFilters.ReceiptFilter, response, ethPayload, filterTxs); err != nil {
return StreamPayload{}, err
return StreamResponse{}, err
}
if err := s.filterState(ethFilters.StateFilter, response, ethPayload); err != nil {
return StreamPayload{}, err
return StreamResponse{}, err
}
if err := s.filterStorage(ethFilters.StorageFilter, response, ethPayload); err != nil {
return StreamPayload{}, err
return StreamResponse{}, err
}
response.BlockNumber = ethPayload.Block.Number()
return *response, nil
}
return StreamPayload{}, nil
return StreamResponse{}, nil
}
func (s *ResponseFilterer) filterHeaders(headerFilter config.HeaderFilter, response *StreamPayload, payload *IPLDPayload) error {
func (s *ResponseFilterer) filterHeaders(headerFilter config.HeaderFilter, response *StreamResponse, payload IPLDPayload) error {
if !headerFilter.Off {
headerRLP, err := rlp.EncodeToBytes(payload.Block.Header())
if err != nil {
@ -102,7 +104,7 @@ func checkRange(start, end, actual int64) bool {
return false
}
func (s *ResponseFilterer) filterTransactions(trxFilter config.TxFilter, response *StreamPayload, payload *IPLDPayload) ([]common.Hash, error) {
func (s *ResponseFilterer) filterTransactions(trxFilter config.TxFilter, response *StreamResponse, payload IPLDPayload) ([]common.Hash, error) {
trxHashes := make([]common.Hash, 0, len(payload.Block.Body().Transactions))
if !trxFilter.Off {
for i, trx := range payload.Block.Body().Transactions {
@ -137,7 +139,7 @@ func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst strin
return false
}
func (s *ResponseFilterer) filerReceipts(receiptFilter config.ReceiptFilter, response *StreamPayload, payload *IPLDPayload, trxHashes []common.Hash) error {
func (s *ResponseFilterer) filerReceipts(receiptFilter config.ReceiptFilter, response *StreamResponse, payload IPLDPayload, trxHashes []common.Hash) error {
if !receiptFilter.Off {
for i, receipt := range payload.Receipts {
// topics is always length 4
@ -221,7 +223,7 @@ func slicesShareString(slice1, slice2 []string) int {
return 0
}
func (s *ResponseFilterer) filterState(stateFilter config.StateFilter, response *StreamPayload, payload *IPLDPayload) error {
func (s *ResponseFilterer) filterState(stateFilter config.StateFilter, response *StreamResponse, payload IPLDPayload) error {
if !stateFilter.Off {
response.StateNodesRlp = make(map[common.Hash][]byte)
keyFilters := make([]common.Hash, len(stateFilter.Addresses))
@ -252,7 +254,7 @@ func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool {
return false
}
func (s *ResponseFilterer) filterStorage(storageFilter config.StorageFilter, response *StreamPayload, payload *IPLDPayload) error {
func (s *ResponseFilterer) filterStorage(storageFilter config.StorageFilter, response *StreamResponse, payload IPLDPayload) error {
if !storageFilter.Off {
response.StorageNodesRlp = make(map[common.Hash]map[common.Hash][]byte)
stateKeyFilters := make([]common.Hash, len(storageFilter.Addresses))

View File

@ -45,7 +45,7 @@ var _ = Describe("Filterer", func() {
It("Transcribes all the data from the IPLDPayload into the StreamPayload if given an open filter", func() {
payload, err := filterer.Filter(openFilter, mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
superNodePayload, ok := payload.(eth.StreamPayload)
superNodePayload, ok := payload.(eth.StreamResponse)
Expect(ok).To(BeTrue())
Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64()))
Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeedNodePayload.HeadersRlp))
@ -66,7 +66,7 @@ var _ = Describe("Filterer", func() {
It("Applies filters from the provided config.Subscription", func() {
payload1, err := filterer.Filter(rctContractFilter, mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
superNodePayload1, ok := payload1.(eth.StreamPayload)
superNodePayload1, ok := payload1.(eth.StreamResponse)
Expect(ok).To(BeTrue())
Expect(superNodePayload1.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload1.HeadersRlp)).To(Equal(0))
@ -79,7 +79,7 @@ var _ = Describe("Filterer", func() {
payload2, err := filterer.Filter(rctTopicsFilter, mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
superNodePayload2, ok := payload2.(eth.StreamPayload)
superNodePayload2, ok := payload2.(eth.StreamResponse)
Expect(ok).To(BeTrue())
Expect(superNodePayload2.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload2.HeadersRlp)).To(Equal(0))
@ -92,7 +92,7 @@ var _ = Describe("Filterer", func() {
payload3, err := filterer.Filter(rctTopicsAndContractFilter, mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
superNodePayload3, ok := payload3.(eth.StreamPayload)
superNodePayload3, ok := payload3.(eth.StreamResponse)
Expect(ok).To(BeTrue())
Expect(superNodePayload3.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload3.HeadersRlp)).To(Equal(0))
@ -105,7 +105,7 @@ var _ = Describe("Filterer", func() {
payload4, err := filterer.Filter(rctContractsAndTopicFilter, mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
superNodePayload4, ok := payload4.(eth.StreamPayload)
superNodePayload4, ok := payload4.(eth.StreamResponse)
Expect(ok).To(BeTrue())
Expect(superNodePayload4.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload4.HeadersRlp)).To(Equal(0))
@ -118,7 +118,7 @@ var _ = Describe("Filterer", func() {
payload5, err := filterer.Filter(rctsForAllCollectedTrxs, mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
superNodePayload5, ok := payload5.(eth.StreamPayload)
superNodePayload5, ok := payload5.(eth.StreamResponse)
Expect(ok).To(BeTrue())
Expect(superNodePayload5.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload5.HeadersRlp)).To(Equal(0))
@ -134,7 +134,7 @@ var _ = Describe("Filterer", func() {
payload6, err := filterer.Filter(rctsForSelectCollectedTrxs, mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
superNodePayload6, ok := payload6.(eth.StreamPayload)
superNodePayload6, ok := payload6.(eth.StreamResponse)
Expect(ok).To(BeTrue())
Expect(superNodePayload6.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload6.HeadersRlp)).To(Equal(0))
@ -148,7 +148,7 @@ var _ = Describe("Filterer", func() {
payload7, err := filterer.Filter(stateFilter, mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
superNodePayload7, ok := payload7.(eth.StreamPayload)
superNodePayload7, ok := payload7.(eth.StreamResponse)
Expect(ok).To(BeTrue())
Expect(superNodePayload7.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload7.HeadersRlp)).To(Equal(0))
@ -161,7 +161,7 @@ var _ = Describe("Filterer", func() {
payload8, err := filterer.Filter(rctTopicsAndContractFilterFail, mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
superNodePayload8, ok := payload8.(eth.StreamPayload)
superNodePayload8, ok := payload8.(eth.StreamResponse)
Expect(ok).To(BeTrue())
Expect(superNodePayload8.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64()))
Expect(len(superNodePayload8.HeadersRlp)).To(Equal(0))

View File

@ -19,6 +19,8 @@ package eth
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
@ -40,7 +42,7 @@ func NewCIDIndexer(db *postgres.DB) *CIDIndexer {
}
// Index indexes a cidPayload in Postgres
func (in *CIDIndexer) Index(cids interface{}) error {
func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error {
cidPayload, ok := cids.(*CIDPayload)
if !ok {
return fmt.Errorf("eth indexer expected cids type %T got %T", &CIDPayload{}, cids)

View File

@ -21,6 +21,8 @@ import (
"errors"
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
@ -51,7 +53,7 @@ func NewIPLDFetcher(ipfsPath string) (*IPLDFetcher, error) {
}
// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper
func (f *IPLDFetcher) Fetch(cids interface{}) (interface{}, error) {
func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.FetchedIPLDs, error) {
cidWrapper, ok := cids.(*CIDWrapper)
if !ok {
return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids)

View File

@ -19,6 +19,8 @@ package mocks
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
@ -27,12 +29,12 @@ import (
// PayloadConverter is the underlying struct for the Converter interface
type PayloadConverter struct {
PassedStatediffPayload statediff.Payload
ReturnIPLDPayload *eth.IPLDPayload
ReturnIPLDPayload eth.IPLDPayload
ReturnErr error
}
// Convert method is used to convert a geth statediff.Payload to a IPLDPayload
func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) {
func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) {
stateDiffPayload, ok := payload.(statediff.Payload)
if !ok {
return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload)
@ -44,13 +46,13 @@ func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) {
// IterativePayloadConverter is the underlying struct for the Converter interface
type IterativePayloadConverter struct {
PassedStatediffPayload []statediff.Payload
ReturnIPLDPayload []*eth.IPLDPayload
ReturnIPLDPayload []eth.IPLDPayload
ReturnErr error
iteration int
}
// Convert method is used to convert a geth statediff.Payload to a IPLDPayload
func (pc *IterativePayloadConverter) Convert(payload interface{}) (interface{}, error) {
func (pc *IterativePayloadConverter) Convert(payload shared.RawChainData) (shared.StreamedIPLDs, error) {
stateDiffPayload, ok := payload.(statediff.Payload)
if !ok {
return nil, fmt.Errorf("convert expected payload type %T got %T", statediff.Payload{}, payload)

View File

@ -20,25 +20,25 @@ import (
"errors"
"sync/atomic"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// StateDiffFetcher mock for tests
type StateDiffFetcher struct {
PayloadsToReturn map[uint64]statediff.Payload
PayloadsToReturn map[uint64]shared.RawChainData
FetchErrs map[uint64]error
CalledAtBlockHeights [][]uint64
CalledTimes int64
}
// FetchStateDiffsAt mock method
func (fetcher *StateDiffFetcher) FetchAt(blockHeights []uint64) ([]interface{}, error) {
func (fetcher *StateDiffFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) {
if fetcher.PayloadsToReturn == nil {
return nil, errors.New("mock StateDiffFetcher needs to be initialized with payloads to return")
}
atomic.AddInt64(&fetcher.CalledTimes, 1) // thread-safe increment
fetcher.CalledAtBlockHeights = append(fetcher.CalledAtBlockHeights, blockHeights)
results := make([]interface{}, 0, len(blockHeights))
results := make([]shared.RawChainData, 0, len(blockHeights))
for _, height := range blockHeights {
results = append(results, fetcher.PayloadsToReturn[height])
err, ok := fetcher.FetchErrs[height]

View File

@ -19,6 +19,8 @@ package mocks
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
)
@ -29,7 +31,7 @@ type CIDIndexer struct {
}
// Index indexes a cidPayload in Postgres
func (repo *CIDIndexer) Index(cids interface{}) error {
func (repo *CIDIndexer) Index(cids shared.CIDsForIndexing) error {
cidPayload, ok := cids.(*eth.CIDPayload)
if !ok {
return fmt.Errorf("index expected cids type %T got %T", &eth.CIDPayload{}, cids)

View File

@ -19,19 +19,21 @@ package mocks
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
)
// IPLDPublisher is the underlying struct for the Publisher interface
type IPLDPublisher struct {
PassedIPLDPayload *eth.IPLDPayload
PassedIPLDPayload eth.IPLDPayload
ReturnCIDPayload *eth.CIDPayload
ReturnErr error
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisher) Publish(payload interface{}) (interface{}, error) {
ipldPayload, ok := payload.(*eth.IPLDPayload)
func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(eth.IPLDPayload)
if !ok {
return nil, fmt.Errorf("publish expected payload type %T got %T", &eth.IPLDPayload{}, payload)
}
@ -41,15 +43,15 @@ func (pub *IPLDPublisher) Publish(payload interface{}) (interface{}, error) {
// IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing
type IterativeIPLDPublisher struct {
PassedIPLDPayload []*eth.IPLDPayload
PassedIPLDPayload []eth.IPLDPayload
ReturnCIDPayload []*eth.CIDPayload
ReturnErr error
iteration int
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IterativeIPLDPublisher) Publish(payload interface{}) (interface{}, error) {
ipldPayload, ok := payload.(*eth.IPLDPayload)
func (pub *IterativeIPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(eth.IPLDPayload)
if !ok {
return nil, fmt.Errorf("publish expected payload type %T got %T", &eth.IPLDPayload{}, payload)
}

View File

@ -31,7 +31,7 @@ type MockCIDRetriever struct {
}
// RetrieveCIDs mock method
func (*MockCIDRetriever) Retrieve(filter interface{}, blockNumber int64) (interface{}, bool, error) {
func (*MockCIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) {
panic("implement me")
}

View File

@ -24,14 +24,14 @@ import (
// StateDiffStreamer is the underlying struct for the Streamer interface
type StateDiffStreamer struct {
PassedPayloadChan chan interface{}
PassedPayloadChan chan shared.RawChainData
ReturnSub *rpc.ClientSubscription
ReturnErr error
StreamPayloads []statediff.Payload
}
// Stream is the main loop for subscribing to data from the Geth state diff process
func (sds *StateDiffStreamer) Stream(payloadChan chan interface{}) (shared.ClientSubscription, error) {
func (sds *StateDiffStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) {
sds.PassedPayloadChan = payloadChan
go func() {

View File

@ -233,7 +233,7 @@ var (
TotalDifficulty: big.NewInt(1337),
}
MockIPLDPayload = &eth.IPLDPayload{
MockIPLDPayload = eth.IPLDPayload{
TotalDifficulty: big.NewInt(1337),
Block: MockBlock,
Receipts: MockReceipts,
@ -318,7 +318,7 @@ var (
},
}
MockSeedNodePayload = eth2.StreamPayload{
MockSeedNodePayload = eth2.StreamResponse{
BlockNumber: big.NewInt(1),
HeadersRlp: [][]byte{MockHeaderRlp},
UnclesRlp: [][]byte{},

View File

@ -19,6 +19,8 @@ package eth
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/pkg/eth/client"
@ -47,7 +49,7 @@ func NewPayloadFetcher(bc BatchClient) *PayloadFetcher {
// FetchAt fetches the statediff payloads at the given block heights
// Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error)
func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]interface{}, error) {
func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) {
batch := make([]client.BatchElem, 0)
for _, height := range blockHeights {
batch = append(batch, client.BatchElem{
@ -60,7 +62,7 @@ func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]interface{}, er
if batchErr != nil {
return nil, fmt.Errorf("PayloadFetcher err: %s", batchErr.Error())
}
results := make([]interface{}, 0, len(blockHeights))
results := make([]shared.RawChainData, 0, len(blockHeights))
for _, batchElem := range batch {
if batchElem.Error != nil {
return nil, fmt.Errorf("PayloadFetcher err: %s", batchElem.Error.Error())

View File

@ -53,10 +53,10 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) {
}
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *IPLDPublisher) Publish(payload interface{}) (interface{}, error) {
ipldPayload, ok := payload.(*IPLDPayload)
func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForIndexing, error) {
ipldPayload, ok := payload.(IPLDPayload)
if !ok {
return nil, fmt.Errorf("eth publisher expected payload type %T got %T", &IPLDPayload{}, payload)
return nil, fmt.Errorf("eth publisher expected payload type %T got %T", IPLDPayload{}, payload)
}
// Process and publish headers
headerCid, err := pub.publishHeader(ipldPayload.Block.Header())

View File

@ -19,6 +19,8 @@ package eth
import (
"fmt"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-block-format"
)
@ -32,12 +34,12 @@ func NewIPLDResolver() *IPLDResolver {
}
// Resolve is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper
func (eir *IPLDResolver) Resolve(iplds interface{}) (interface{}, error) {
func (eir *IPLDResolver) Resolve(iplds shared.FetchedIPLDs) (shared.ServerResponse, error) {
ipfsBlocks, ok := iplds.(*IPLDWrapper)
if !ok {
return StreamPayload{}, fmt.Errorf("eth resolver expected iplds type %T got %T", &IPLDWrapper{}, iplds)
return StreamResponse{}, fmt.Errorf("eth resolver expected iplds type %T got %T", &IPLDWrapper{}, iplds)
}
return StreamPayload{
return StreamResponse{
BlockNumber: ipfsBlocks.BlockNumber,
HeadersRlp: eir.ResolveHeaders(ipfsBlocks.Headers),
UnclesRlp: eir.ResolveUncles(ipfsBlocks.Uncles),

View File

@ -37,7 +37,7 @@ var _ = Describe("Resolver", func() {
It("Resolves IPLD data to their correct geth data types and packages them to send to requesting transformers", func() {
payload, err := resolver.Resolve(mocks.MockIPLDWrapper)
Expect(err).ToNot(HaveOccurred())
superNodePayload, ok := payload.(eth.StreamPayload)
superNodePayload, ok := payload.(eth.StreamResponse)
Expect(ok).To(BeTrue())
Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeedNodePayload.BlockNumber.Int64()))
Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeedNodePayload.HeadersRlp))

View File

@ -58,7 +58,7 @@ func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) {
}
// Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters
func (ecr *CIDRetriever) Retrieve(filter interface{}, blockNumber int64) (interface{}, bool, error) {
func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) {
streamFilter, ok := filter.(*config.EthSubscription)
if !ok {
return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &config.EthSubscription{}, filter)

View File

@ -41,7 +41,7 @@ func NewPayloadStreamer(client core.RPCClient) *PayloadStreamer {
// Stream is the main loop for subscribing to data from the Geth state diff process
// Satisfies the shared.PayloadStreamer interface
func (ps *PayloadStreamer) Stream(payloadChan chan interface{}) (shared.ClientSubscription, error) {
func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) {
logrus.Info("streaming diffs from geth")
return ps.Client.Subscribe("statediff", payloadChan, "stream")
}

View File

@ -17,6 +17,7 @@ package eth_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/vulcanize/vulcanizedb/pkg/eth/fakes"
"github.com/vulcanize/vulcanizedb/pkg/super_node/eth"
@ -26,7 +27,7 @@ var _ = Describe("StateDiff Streamer", func() {
It("subscribes to the geth statediff service", func() {
client := &fakes.MockRPCClient{}
streamer := eth.NewPayloadStreamer(client)
payloadChan := make(chan interface{})
payloadChan := make(chan shared.RawChainData)
_, err := streamer.Stream(payloadChan)
Expect(err).NotTo(HaveOccurred())
client.AssertSubscribeCalledWith("statediff", payloadChan, []interface{}{"stream"})

View File

@ -20,6 +20,8 @@ import (
"encoding/json"
"math/big"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ipfs/go-block-format"
@ -38,6 +40,10 @@ type IPLDPayload struct {
StorageNodes map[common.Hash][]TrieNode
}
func (ip IPLDPayload) Value() shared.StreamedIPLDs {
return ip
}
// Trie struct used to flag node as leaf or not
type TrieNode struct {
Key common.Hash
@ -83,10 +89,10 @@ type IPLDWrapper struct {
StorageNodes map[common.Hash]map[common.Hash]blocks.Block
}
// StreamPayload holds the data streamed from the super node eth service to the requesting clients
// StreamResponse holds the data streamed from the super node eth service to the requesting clients
// Returned by IPLDResolver and ResponseFilterer
// Passed to client subscriptions
type StreamPayload struct {
type StreamResponse struct {
BlockNumber *big.Int `json:"blockNumber"`
HeadersRlp [][]byte `json:"headersRlp"`
UnclesRlp [][]byte `json:"unclesRlp"`
@ -99,20 +105,24 @@ type StreamPayload struct {
err error
}
func (sd *StreamPayload) ensureEncoded() {
func (sr StreamResponse) Value() shared.ServerResponse {
return sr
}
func (sd *StreamResponse) ensureEncoded() {
if sd.encoded == nil && sd.err == nil {
sd.encoded, sd.err = json.Marshal(sd)
}
}
// Length to implement Encoder interface for StateDiff
func (sd *StreamPayload) Length() int {
func (sd *StreamResponse) Length() int {
sd.ensureEncoded()
return len(sd.encoded)
}
// Encode to implement Encoder interface for StateDiff
func (sd *StreamPayload) Encode() ([]byte, error) {
func (sd *StreamResponse) Encode() ([]byte, error) {
sd.ensureEncoded()
return sd.encoded, sd.err
}

View File

@ -21,7 +21,7 @@ import log "github.com/sirupsen/logrus"
func sendNonBlockingErr(sub Subscription, err error) {
log.Error(err)
select {
case sub.PayloadChan <- Payload{nil, err.Error()}:
case sub.PayloadChan <- SubscriptionPayload{nil, err.Error()}:
default:
log.Infof("unable to send error to subscription %s", sub.ID)
}

View File

@ -24,7 +24,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
@ -47,11 +46,11 @@ type SuperNode interface {
// APIs(), Protocols(), Start() and Stop()
node.Service
// Main event loop for syncAndPublish processes
SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- interface{}, forwardQuitchan chan<- bool) error
SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.StreamedIPLDs, forwardQuitchan chan<- bool) error
// Main event loop for handling client pub-sub
ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan interface{}, screenAndServeQuit <-chan bool)
ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs, screenAndServeQuit <-chan bool)
// Method to subscribe to receive state diff processing output
Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params SubscriptionSettings)
Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings)
// Method to unsubscribe from state diff processing
Unsubscribe(id rpc.ID)
// Method to access the node info for this service
@ -79,13 +78,13 @@ type Service struct {
// Interface for resolving IPLDs to their data types
Resolver shared.IPLDResolver
// Chan the processor uses to subscribe to payloads from the Streamer
PayloadChan chan interface{}
PayloadChan chan shared.RawChainData
// Used to signal shutdown of the service
QuitChan chan bool
// A mapping of rpc.IDs to their subscription channels, mapped to their subscription type (hash of the StreamFilters)
Subscriptions map[common.Hash]map[rpc.ID]Subscription
// A mapping of subscription params hash to the corresponding subscription params
SubscriptionTypes map[common.Hash]SubscriptionSettings
SubscriptionTypes map[common.Hash]shared.SubscriptionSettings
// Info for the Geth node that this super node is working with
NodeInfo core.Node
// Number of publishAndIndex workers
@ -111,7 +110,7 @@ func NewSuperNode(settings *config.SuperNode) (SuperNode, error) {
if err != nil {
return nil, err
}
sn.Converter, err = NewPayloadConverter(settings.Chain, params.MainnetChainConfig)
sn.Converter, err = NewPayloadConverter(settings.Chain)
if err != nil {
return nil, err
}
@ -145,7 +144,7 @@ func NewSuperNode(settings *config.SuperNode) (SuperNode, error) {
}
sn.QuitChan = settings.Quit
sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
sn.SubscriptionTypes = make(map[common.Hash]SubscriptionSettings)
sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings)
sn.WorkerPoolSize = settings.Workers
sn.NodeInfo = settings.NodeInfo
sn.ipfsPath = settings.IPFSPath
@ -180,7 +179,7 @@ func (sap *Service) APIs() []rpc.API {
// SyncAndPublish is the backend processing loop which streams data from geth, converts it to iplds, publishes them to ipfs, and indexes their cids
// This continues on no matter if or how many subscribers there are, it then forwards the data to the ScreenAndServe() loop
// which filters and sends relevant data to client subscriptions, if there are any
func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- interface{}, screenAndServeQuit chan<- bool) error {
func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- shared.StreamedIPLDs, screenAndServeQuit chan<- bool) error {
sub, err := sap.Streamer.Stream(sap.PayloadChan)
if err != nil {
return err
@ -188,7 +187,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha
wg.Add(1)
// Channels for forwarding data to the publishAndIndex workers
publishAndIndexPayload := make(chan interface{}, PayloadChanBufferSize)
publishAndIndexPayload := make(chan shared.StreamedIPLDs, PayloadChanBufferSize)
publishAndIndexQuit := make(chan bool, sap.WorkerPoolSize)
// publishAndIndex worker pool to handle publishing and indexing concurrently, while
// limiting the number of Postgres connections we can possibly open so as to prevent error
@ -204,14 +203,14 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha
log.Error(err)
continue
}
// If we have a ScreenAndServe process running, forward the payload to it
// If we have a ScreenAndServe process running, forward the iplds to it
select {
case screenAndServePayload <- ipldPayload:
case screenAndServePayload <- ipldPayload.Value():
default:
}
// Forward the payload to the publishAndIndex workers
select {
case publishAndIndexPayload <- ipldPayload:
case publishAndIndexPayload <- ipldPayload.Value():
default:
}
case err := <-sub.Err():
@ -239,7 +238,7 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha
return nil
}
func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan interface{}, publishAndIndexQuit <-chan bool) {
func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.StreamedIPLDs, publishAndIndexQuit <-chan bool) {
go func() {
for {
select {
@ -263,7 +262,7 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan interf
// ScreenAndServe is the loop used to screen data streamed from the state diffing eth node
// and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration
func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan interface{}, screenAndServeQuit <-chan bool) {
func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs, screenAndServeQuit <-chan bool) {
wg.Add(1)
go func() {
for {
@ -280,7 +279,7 @@ func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-c
log.Info("screenAndServe goroutine successfully spun up")
}
func (sap *Service) sendResponse(payload interface{}) {
func (sap *Service) sendResponse(payload shared.StreamedIPLDs) {
sap.Lock()
for ty, subs := range sap.Subscriptions {
// Retrieve the subscription parameters for this subscription type
@ -298,7 +297,7 @@ func (sap *Service) sendResponse(payload interface{}) {
}
for id, sub := range subs {
select {
case sub.PayloadChan <- Payload{response, ""}:
case sub.PayloadChan <- SubscriptionPayload{response.Value(), ""}:
log.Infof("sending super node payload to subscription %s", id)
default:
log.Infof("unable to send payload to subscription %s; channel has no receiver", id)
@ -309,8 +308,8 @@ func (sap *Service) sendResponse(payload interface{}) {
}
// Subscribe is used by the API to subscribe to the service loop
// The params must be rlp serializable and satisfy the Params() interface
func (sap *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params SubscriptionSettings) {
// The params must be rlp serializable and satisfy the SubscriptionSettings() interface
func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) {
log.Info("Subscribing to the super node service")
subscription := Subscription{
ID: id,
@ -351,7 +350,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- boo
}
}
func (sap *Service) backFill(sub Subscription, id rpc.ID, params SubscriptionSettings) error {
func (sap *Service) backFill(sub Subscription, id rpc.ID, params shared.SubscriptionSettings) error {
log.Debug("sending historical data for subscriber", id)
// Retrieve cached CIDs relevant to this subscriber
var endingBlock int64
@ -394,7 +393,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, params SubscriptionSet
continue
}
select {
case sub.PayloadChan <- Payload{backFillIplds, ""}:
case sub.PayloadChan <- SubscriptionPayload{backFillIplds.Value(), ""}:
log.Infof("sending super node historical data payload to subscription %s", id)
default:
log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id)
@ -423,7 +422,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) {
func (sap *Service) Start(*p2p.Server) error {
log.Info("Starting super node service")
wg := new(sync.WaitGroup)
payloadChan := make(chan interface{}, PayloadChanBufferSize)
payloadChan := make(chan shared.StreamedIPLDs, PayloadChanBufferSize)
quitChan := make(chan bool, 1)
if err := sap.SyncAndPublish(wg, payloadChan, quitChan); err != nil {
return err

View File

@ -20,6 +20,8 @@ import (
"sync"
"time"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
@ -33,7 +35,7 @@ var _ = Describe("Service", func() {
Describe("SyncAndPublish", func() {
It("Streams statediff.Payloads, converts them to IPLDPayloads, publishes IPLDPayloads, and indexes CIDPayloads", func() {
wg := new(sync.WaitGroup)
payloadChan := make(chan interface{}, 1)
payloadChan := make(chan shared.RawChainData, 1)
quitChan := make(chan bool, 1)
mockCidIndexer := &mocks2.CIDIndexer{
ReturnErr: nil,

View File

@ -16,7 +16,10 @@
package shared
import "bytes"
import (
"bytes"
"reflect"
)
// ListContainsString used to check if a list of strings contains a particular string
func ListContainsString(sss []string, s string) bool {
@ -47,3 +50,8 @@ func ListContainsGap(gapList []Gap, gap Gap) bool {
}
return false
}
// IsPointer returns true if the concrete type underneath the provided interface is a pointer
func IsPointer(i interface{}) bool {
return reflect.ValueOf(i).Type().Kind() == reflect.Ptr
}

View File

@ -16,39 +16,45 @@
package shared
import (
"math/big"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
)
// PayloadStreamer streams chain-specific payloads to the provided channel
type PayloadStreamer interface {
Stream(payloadChan chan interface{}) (ClientSubscription, error)
Stream(payloadChan chan RawChainData) (ClientSubscription, error)
}
// PayloadFetcher fetches chain-specific payloads
type PayloadFetcher interface {
FetchAt(blockHeights []uint64) ([]interface{}, error)
FetchAt(blockHeights []uint64) ([]RawChainData, error)
}
// PayloadConverter converts chain-specific payloads into IPLD payloads for publishing
type PayloadConverter interface {
Convert(payload interface{}) (interface{}, error)
Convert(payload RawChainData) (StreamedIPLDs, error)
}
// IPLDPublisher publishes IPLD payloads and returns a CID payload for indexing
type IPLDPublisher interface {
Publish(payload interface{}) (interface{}, error)
Publish(payload StreamedIPLDs) (CIDsForIndexing, error)
}
// CIDIndexer indexes a CID payload in Postgres
type CIDIndexer interface {
Index(cids interface{}) error
Index(cids CIDsForIndexing) error
}
// ResponseFilterer applies a filter to an IPLD payload to return a subscription response packet
type ResponseFilterer interface {
Filter(filter, payload interface{}) (response interface{}, err error)
Filter(filter SubscriptionSettings, payload StreamedIPLDs) (response ServerResponse, err error)
}
// CIDRetriever retrieves cids according to a provided filter and returns a CID wrapper
type CIDRetriever interface {
Retrieve(filter interface{}, blockNumber int64) (interface{}, bool, error)
Retrieve(filter SubscriptionSettings, blockNumber int64) (CIDsForFetching, bool, error)
RetrieveFirstBlockNumber() (int64, error)
RetrieveLastBlockNumber() (int64, error)
RetrieveGapsInData() ([]Gap, error)
@ -56,12 +62,12 @@ type CIDRetriever interface {
// IPLDFetcher uses a CID wrapper to fetch an IPLD wrapper
type IPLDFetcher interface {
Fetch(cids interface{}) (interface{}, error)
Fetch(cids CIDsForFetching) (FetchedIPLDs, error)
}
// IPLDResolver resolves an IPLD wrapper into chain-specific payloads
type IPLDResolver interface {
Resolve(iplds interface{}) (interface{}, error)
Resolve(iplds FetchedIPLDs) (ServerResponse, error)
}
// ClientSubscription is a general interface for chain data subscriptions
@ -74,3 +80,15 @@ type ClientSubscription interface {
type DagPutter interface {
DagPut(raw interface{}) ([]string, error)
}
// SubscriptionSettings is the interface every subscription filter type needs to satisfy, no matter the chain
// Further specifics of the underlying filter type depend on the internal needs of the types
// which satisfy the ResponseFilterer and CIDRetriever interfaces for a specific chain
// The underlying type needs to be rlp serializable
type SubscriptionSettings interface {
StartingBlock() *big.Int
EndingBlock() *big.Int
ChainType() config.ChainType
HistoricalData() bool
HistoricalDataOnly() bool
}

View File

@ -16,6 +16,29 @@
package shared
// These types serve as very loose wrappers around a generic underlying interface{}
type RawChainData interface{}
// The concrete type underneath StreamedIPLDs can be a pointer only if the Value() method returns a copy of the values
// stored at that memory location and not a copy of the pointer itself.
// We want to avoid sending a pointer to publishAndIndex and screenAndServe channels; sharing memory across these processes
type StreamedIPLDs interface {
Value() StreamedIPLDs
}
type CIDsForIndexing interface{}
type CIDsForFetching interface{}
type FetchedIPLDs interface{}
// The concrete type underneath StreamedIPLDs can be a pointer only if the Value() method returns a copy of the values
// stored at that memory location and not a copy of the pointer itself.
// We want to avoid sending a pointer to subscription channels; sharing memory across all subscriptions
type ServerResponse interface {
Value() ServerResponse
}
type Gap struct {
Start uint64
Stop uint64

View File

@ -17,35 +17,20 @@
package super_node
import (
"math/big"
"github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// Subscription holds the information for an individual client subscription to the super node
type Subscription struct {
ID rpc.ID
PayloadChan chan<- Payload
PayloadChan chan<- SubscriptionPayload
QuitChan chan<- bool
}
// Payload is the struct for a super node stream payload
// SubscriptionPayload is the struct for a super node stream payload
// It carries data of a type specific to the chain being supported/queried and an error message
type Payload struct {
Data interface{} `json:"data"` // e.g. for Ethereum eth.StreamPayload
type SubscriptionPayload struct {
Data shared.ServerResponse `json:"data"` // e.g. for Ethereum eth.StreamPayload
Err string `json:"err"`
}
// SubscriptionSettings is the interface every subscription filter type needs to satisfy, no matter the chain
// Further specifics of the underlying filter type depend on the internal needs of the types
// which satisfy the ResponseFilterer and CIDRetriever interfaces for a specific chain
// The underlying type needs to be rlp serializable
type SubscriptionSettings interface {
StartingBlock() *big.Int
EndingBlock() *big.Int
ChainType() config.ChainType
HistoricalData() bool
HistoricalDataOnly() bool
}