From aa2f8bd0a54f95215af09240ce9f7732d1cdead0 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Tue, 4 Feb 2020 19:02:01 -0600 Subject: [PATCH] btc retriever and filterer --- pkg/super_node/btc/converter.go | 2 +- pkg/super_node/btc/filterer.go | 81 +++++++++ pkg/super_node/btc/retriever.go | 210 ++++++++++++++++++++++ pkg/super_node/btc/subscription_config.go | 26 +-- pkg/super_node/btc/types.go | 28 ++- pkg/super_node/constructors.go | 1 + pkg/super_node/eth/filterer.go | 20 +-- pkg/super_node/eth/subscription_config.go | 4 +- pkg/super_node/eth/types.go | 28 +-- pkg/super_node/service.go | 8 +- pkg/super_node/shared/types.go | 16 +- 11 files changed, 347 insertions(+), 77 deletions(-) diff --git a/pkg/super_node/btc/converter.go b/pkg/super_node/btc/converter.go index 61fded86..7a622149 100644 --- a/pkg/super_node/btc/converter.go +++ b/pkg/super_node/btc/converter.go @@ -26,7 +26,7 @@ import ( ) // PayloadConverter satisfies the PayloadConverter interface for bitcoin -type PayloadConverter struct{ +type PayloadConverter struct { chainConfig *chaincfg.Params } diff --git a/pkg/super_node/btc/filterer.go b/pkg/super_node/btc/filterer.go index 8dd3c1ae..8897734c 100644 --- a/pkg/super_node/btc/filterer.go +++ b/pkg/super_node/btc/filterer.go @@ -15,3 +15,84 @@ // along with this program. If not, see . package btc + +import ( + "bytes" + "fmt" + "math/big" + + "github.com/btcsuite/btcutil" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +// ResponseFilterer satisfies the ResponseFilterer interface for ethereum +type ResponseFilterer struct{} + +// NewResponseFilterer creates a new Filterer satisfying the ResponseFilterer interface +func NewResponseFilterer() *ResponseFilterer { + return &ResponseFilterer{} +} + +// Filter is used to filter through eth data to extract and package requested data into a Payload +func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.StreamedIPLDs) (shared.ServerResponse, error) { + btcFilters, ok := filter.(*SubscriptionSettings) + if !ok { + return StreamResponse{}, fmt.Errorf("eth filterer expected filter type %T got %T", &SubscriptionSettings{}, filter) + } + btcPayload, ok := payload.(IPLDPayload) + if !ok { + return StreamResponse{}, fmt.Errorf("eth filterer expected payload type %T got %T", IPLDPayload{}, payload) + } + height := int64(btcPayload.Height) + if checkRange(btcFilters.Start.Int64(), btcFilters.End.Int64(), height) { + response := new(StreamResponse) + if err := s.filterHeaders(btcFilters.HeaderFilter, response, btcPayload); err != nil { + return StreamResponse{}, err + } + if err := s.filterTransactions(btcFilters.TxFilter, response, btcPayload); err != nil { + return StreamResponse{}, err + } + response.BlockNumber = big.NewInt(height) + return *response, nil + } + return StreamResponse{}, nil +} + +func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *StreamResponse, payload IPLDPayload) error { + if !headerFilter.Off { + headerBuffer := new(bytes.Buffer) + if err := payload.Header.Serialize(headerBuffer); err != nil { + return err + } + response.SerializedHeaders = append(response.SerializedHeaders, headerBuffer.Bytes()) + } + return nil +} + +func checkRange(start, end, actual int64) bool { + if (end <= 0 || end >= actual) && start <= actual { + return true + } + return false +} + +func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *StreamResponse, payload IPLDPayload) error { + if !trxFilter.Off { + for _, trx := range payload.Txs { + if checkTransaction(trx, trxFilter) { + trxBuffer := new(bytes.Buffer) + if err := trx.MsgTx().Serialize(trxBuffer); err != nil { + return err + } + response.SerializedTxs = append(response.SerializedTxs, trxBuffer.Bytes()) + } + } + } + return nil +} + +// checkTransaction returns true if the provided transaction has a hit on the filter +func checkTransaction(trx *btcutil.Tx, txFilter TxFilter) bool { + panic("implement me") +} diff --git a/pkg/super_node/btc/retriever.go b/pkg/super_node/btc/retriever.go index 8dd3c1ae..6a285d91 100644 --- a/pkg/super_node/btc/retriever.go +++ b/pkg/super_node/btc/retriever.go @@ -15,3 +15,213 @@ // along with this program. If not, see . package btc + +import ( + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/jmoiron/sqlx" + log "github.com/sirupsen/logrus" + + "github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +// CIDRetriever satisfies the CIDRetriever interface for bitcoin +type CIDRetriever struct { + db *postgres.DB +} + +// NewCIDRetriever returns a pointer to a new CIDRetriever which supports the CIDRetriever interface +func NewCIDRetriever(db *postgres.DB) *CIDRetriever { + return &CIDRetriever{ + db: db, + } +} + +// RetrieveFirstBlockNumber is used to retrieve the first block number in the db +func (ecr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) { + var blockNumber int64 + err := ecr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number ASC LIMIT 1") + return blockNumber, err +} + +// RetrieveLastBlockNumber is used to retrieve the latest block number in the db +func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) { + var blockNumber int64 + err := ecr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number DESC LIMIT 1 ") + return blockNumber, err +} + +// Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters +func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) { + streamFilter, ok := filter.(*SubscriptionSettings) + if !ok { + return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &SubscriptionSettings{}, filter) + } + log.Debug("retrieving cids") + tx, err := ecr.db.Beginx() + if err != nil { + return nil, true, err + } + + cw := new(CIDWrapper) + cw.BlockNumber = big.NewInt(blockNumber) + // Retrieve cached header CIDs + if !streamFilter.HeaderFilter.Off { + cw.Headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) + } + log.Error("header cid retrieval error") + return nil, true, err + } + } + // Retrieve cached trx CIDs + if !streamFilter.TxFilter.Off { + cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, blockNumber) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) + } + log.Error("transaction cid retrieval error") + return nil, true, err + } + } + trxIds := make([]int64, 0, len(cw.Transactions)) + for _, tx := range cw.Transactions { + trxIds = append(trxIds, tx.ID) + } + return cw, empty(cw), tx.Commit() +} + +func empty(cidWrapper *CIDWrapper) bool { + if len(cidWrapper.Transactions) > 0 || len(cidWrapper.Headers) > 0 { + return false + } + return true +} + +// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight +func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) { + log.Debug("retrieving header cids for block ", blockNumber) + headers := make([]HeaderModel, 0) + pgStr := `SELECT * FROM btc.header_cids + WHERE block_number = $1` + return headers, tx.Select(&headers, pgStr, blockNumber) +} + +// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters +// also returns the ids for the returned transaction cids +func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNumber int64) ([]TxModel, error) { + log.Debug("retrieving transaction cids for block ", blockNumber) + args := make([]interface{}, 0, 3) + results := make([]TxModel, 0) + pgStr := `SELECT transaction_cids.id, transaction_cids.header_id, + transaction_cids.tx_hash, transaction_cids.cid, + transaction_cids.dst, transaction_cids.src, transaction_cids.index + FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) + WHERE header_cids.block_number = $1` + args = append(args, blockNumber) + return results, tx.Select(&results, pgStr, args...) +} + +// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db +func (ecr *CIDRetriever) RetrieveGapsInData() ([]shared.Gap, error) { + pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM btc.header_cids + LEFT JOIN btc.header_cids r on btc.header_cids.block_number = r.block_number - 1 + LEFT JOIN btc.header_cids fr on btc.header_cids.block_number < fr.block_number + WHERE r.block_number is NULL and fr.block_number IS NOT NULL + GROUP BY header_cids.block_number, r.block_number` + results := make([]struct { + Start uint64 `db:"start"` + Stop uint64 `db:"stop"` + }, 0) + err := ecr.db.Select(&results, pgStr) + if err != nil { + return nil, err + } + gaps := make([]shared.Gap, len(results)) + for i, res := range results { + gaps[i] = shared.Gap{ + Start: res.Start, + Stop: res.Stop, + } + } + return gaps, nil +} + +// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash +func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []TxModel, error) { + log.Debug("retrieving block cids for block hash ", blockHash.String()) + tx, err := ecr.db.Beginx() + if err != nil { + return HeaderModel{}, nil, err + } + headerCID, err := ecr.RetrieveHeaderCIDByHash(tx, blockHash) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) + } + log.Error("header cid retrieval error") + return HeaderModel{}, nil, err + } + txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) + } + log.Error("tx cid retrieval error") + return HeaderModel{}, nil, err + } + return headerCID, txCIDs, tx.Commit() +} + +// RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number +func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []TxModel, error) { + log.Debug("retrieving block cids for block number ", blockNumber) + tx, err := ecr.db.Beginx() + if err != nil { + return HeaderModel{}, nil, err + } + headerCID, err := ecr.RetrieveHeaderCIDs(tx, blockNumber) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) + } + log.Error("header cid retrieval error") + return HeaderModel{}, nil, err + } + if len(headerCID) < 1 { + return HeaderModel{}, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber) + } + txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) + } + log.Error("tx cid retrieval error") + return HeaderModel{}, nil, err + } + return headerCID[0], txCIDs, tx.Commit() +} + +// RetrieveHeaderCIDByHash returns the header for the given block hash +func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) { + log.Debug("retrieving header cids for block hash ", blockHash.String()) + pgStr := `SELECT * FROM btc.header_cids + WHERE block_hash = $1` + var headerCID HeaderModel + return headerCID, tx.Get(&headerCID, pgStr, blockHash.String()) +} + +// RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id +func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) { + log.Debug("retrieving tx cids for block id ", headerID) + pgStr := `SELECT * FROM btc.transaction_cids + WHERE header_id = $1` + var txCIDs []TxModel + return txCIDs, tx.Select(&txCIDs, pgStr, headerID) +} diff --git a/pkg/super_node/btc/subscription_config.go b/pkg/super_node/btc/subscription_config.go index 8e4a59b2..f95e3036 100644 --- a/pkg/super_node/btc/subscription_config.go +++ b/pkg/super_node/btc/subscription_config.go @@ -64,30 +64,36 @@ type HeaderFilter struct { // TxFilter contains filter settings for txs type TxFilter struct { - Off bool - Index int64 // allow filtering by index so that we can filter for only coinbase transactions (index 0) if we want to - HasWitness bool - WitnessHashes []string + Off bool + // Top level trx filters + Index int64 // allow filtering by index so that we can filter for only coinbase transactions (index 0) if we want to + Segwit bool // allow filtering for segwit trxs + WitnessHashes []string // allow filtering for specific witness hashes + // TODO: trx input filters + // TODO: trx output filters } // Init is used to initialize a EthSubscription struct with env variables func NewEthSubscriptionConfig() (*SubscriptionSettings, error) { sc := new(SubscriptionSettings) // Below default to false, which means we do not backfill by default - sc.BackFill = viper.GetBool("superNode.ethSubscription.historicalData") - sc.BackFillOnly = viper.GetBool("superNode.ethSubscription.historicalDataOnly") + sc.BackFill = viper.GetBool("superNode.btcSubscription.historicalData") + sc.BackFillOnly = viper.GetBool("superNode.btcSubscription.historicalDataOnly") // Below default to 0 // 0 start means we start at the beginning and 0 end means we continue indefinitely - sc.Start = big.NewInt(viper.GetInt64("superNode.ethSubscription.startingBlock")) - sc.End = big.NewInt(viper.GetInt64("superNode.ethSubscription.endingBlock")) + sc.Start = big.NewInt(viper.GetInt64("superNode.btcSubscription.startingBlock")) + sc.End = big.NewInt(viper.GetInt64("superNode.btcSubscription.endingBlock")) // Below default to false, which means we get all headers and no uncles by default sc.HeaderFilter = HeaderFilter{ - Off: viper.GetBool("superNode.ethSubscription.off"), + Off: viper.GetBool("superNode.btcSubscription.headerFilter.off"), } // Below defaults to false and two slices of length 0 // Which means we get all transactions by default sc.TxFilter = TxFilter{ - Off: viper.GetBool("superNode.ethSubscription.txFilter.off"), + Off: viper.GetBool("superNode.btcSubscription.txFilter.off"), + Index: viper.GetInt64("superNode.btcSubscription.txFilter.index"), + Segwit: viper.GetBool("superNode.btcSubscription.txFilter.segwit"), + WitnessHashes: viper.GetStringSlice("superNode.btcSubscription.txFilter.witnessHashes"), } return sc, nil } diff --git a/pkg/super_node/btc/types.go b/pkg/super_node/btc/types.go index e1d6c629..2f58fd82 100644 --- a/pkg/super_node/btc/types.go +++ b/pkg/super_node/btc/types.go @@ -20,8 +20,6 @@ import ( "encoding/json" "math/big" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/ipfs/go-block-format" @@ -42,10 +40,6 @@ type IPLDPayload struct { TxMetaData []TxModelWithInsAndOuts } -func (ip IPLDPayload) Value() shared.StreamedIPLDs { - return ip -} - // CIDPayload is a struct to hold all the CIDs and their associated meta data for indexing in Postgres // Returned by IPLDPublisher // Passed to CIDIndexer @@ -72,10 +66,10 @@ type IPLDWrapper struct { Transactions []blocks.Block } -// StreamPayload holds the data streamed from the super node eth service to the requesting clients +// StreamResponse holds the data streamed from the super node eth service to the requesting clients // Returned by IPLDResolver and ResponseFilterer // Passed to client subscriptions -type StreamPayload struct { +type StreamResponse struct { BlockNumber *big.Int `json:"blockNumber"` SerializedHeaders [][]byte `json:"headerBytes"` SerializedTxs [][]byte `json:"transactionBytes"` @@ -84,20 +78,20 @@ type StreamPayload struct { err error } -func (sd *StreamPayload) ensureEncoded() { - if sd.encoded == nil && sd.err == nil { - sd.encoded, sd.err = json.Marshal(sd) +func (sr *StreamResponse) ensureEncoded() { + if sr.encoded == nil && sr.err == nil { + sr.encoded, sr.err = json.Marshal(sr) } } // Length to implement Encoder interface for StateDiff -func (sd *StreamPayload) Length() int { - sd.ensureEncoded() - return len(sd.encoded) +func (sr *StreamResponse) Length() int { + sr.ensureEncoded() + return len(sr.encoded) } // Encode to implement Encoder interface for StateDiff -func (sd *StreamPayload) Encode() ([]byte, error) { - sd.ensureEncoded() - return sd.encoded, sd.err +func (sr *StreamResponse) Encode() ([]byte, error) { + sr.ensureEncoded() + return sr.encoded, sr.err } diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index b73cb34b..9500d717 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -18,6 +18,7 @@ package super_node import ( "fmt" + "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/rpcclient" diff --git a/pkg/super_node/eth/filterer.go b/pkg/super_node/eth/filterer.go index 3fb23016..639cde50 100644 --- a/pkg/super_node/eth/filterer.go +++ b/pkg/super_node/eth/filterer.go @@ -106,7 +106,7 @@ func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *Stre trxHashes := make([]common.Hash, 0, len(payload.Block.Body().Transactions)) if !trxFilter.Off { for i, trx := range payload.Block.Body().Transactions { - if checkTransactions(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) { + if checkTransactionAddrs(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) { trxBuffer := new(bytes.Buffer) if err := trx.EncodeRLP(trxBuffer); err != nil { return nil, err @@ -119,7 +119,8 @@ func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *Stre return trxHashes, nil } -func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst string) bool { +// checkTransactionAddrs returns true if either the transaction src and dst are one of the wanted src and dst addresses +func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst string) bool { // If we aren't filtering for any addresses, every transaction is a go if len(wantedDst) == 0 && len(wantedSrc) == 0 { return true @@ -185,21 +186,16 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics [][]string, wa return false } +// filterMatch returns true if the actualTopics conform to the wantedTopics filter func filterMatch(wantedTopics, actualTopics [][]string) bool { - // actualTopics should always be length 4, members could be nil slices though - lenWantedTopics := len(wantedTopics) + // actualTopics should always be length 4, but the members can be nil slices matches := 0 for i, actualTopicSet := range actualTopics { - if i < lenWantedTopics { + if i < len(wantedTopics) && len(wantedTopics[i]) > 0 { // If we have topics in this filter slot, count as a match if one of the topics matches - if len(wantedTopics[i]) > 0 { - matches += slicesShareString(actualTopicSet, wantedTopics[i]) - } else { - // Filter slot is empty, not matching any topics at this slot => counts as a match - matches++ - } + matches += slicesShareString(actualTopicSet, wantedTopics[i]) } else { - // Filter slot doesn't exist, not matching any topics at this slot => count as a match + // Filter slot is either empty or doesn't exist => not matching any topics at this slot => counts as a match matches++ } } diff --git a/pkg/super_node/eth/subscription_config.go b/pkg/super_node/eth/subscription_config.go index 368d461a..f9c77496 100644 --- a/pkg/super_node/eth/subscription_config.go +++ b/pkg/super_node/eth/subscription_config.go @@ -86,8 +86,8 @@ func NewEthSubscriptionConfig() (*SubscriptionSettings, error) { sc.End = big.NewInt(viper.GetInt64("superNode.ethSubscription.endingBlock")) // Below default to false, which means we get all headers and no uncles by default sc.HeaderFilter = HeaderFilter{ - Off: viper.GetBool("superNode.ethSubscription.off"), - Uncles: viper.GetBool("superNode.ethSubscription.uncles"), + Off: viper.GetBool("superNode.ethSubscription.headerFilter.off"), + Uncles: viper.GetBool("superNode.ethSubscription.headerFilter.uncles"), } // Below defaults to false and two slices of length 0 // Which means we get all transactions by default diff --git a/pkg/super_node/eth/types.go b/pkg/super_node/eth/types.go index 81729381..e30d8ce4 100644 --- a/pkg/super_node/eth/types.go +++ b/pkg/super_node/eth/types.go @@ -20,8 +20,6 @@ import ( "encoding/json" "math/big" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ipfs/go-block-format" @@ -40,10 +38,6 @@ type IPLDPayload struct { StorageNodes map[common.Hash][]TrieNode } -func (ip IPLDPayload) Value() shared.StreamedIPLDs { - return ip -} - // Trie struct used to flag node as leaf or not type TrieNode struct { Key common.Hash @@ -105,24 +99,20 @@ type StreamResponse struct { err error } -func (sr StreamResponse) Value() shared.ServerResponse { - return sr -} - -func (sd *StreamResponse) ensureEncoded() { - if sd.encoded == nil && sd.err == nil { - sd.encoded, sd.err = json.Marshal(sd) +func (sr *StreamResponse) ensureEncoded() { + if sr.encoded == nil && sr.err == nil { + sr.encoded, sr.err = json.Marshal(sr) } } // Length to implement Encoder interface for StateDiff -func (sd *StreamResponse) Length() int { - sd.ensureEncoded() - return len(sd.encoded) +func (sr *StreamResponse) Length() int { + sr.ensureEncoded() + return len(sr.encoded) } // Encode to implement Encoder interface for StateDiff -func (sd *StreamResponse) Encode() ([]byte, error) { - sd.ensureEncoded() - return sd.encoded, sd.err +func (sr *StreamResponse) Encode() ([]byte, error) { + sr.ensureEncoded() + return sr.encoded, sr.err } diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index c650533f..31756092 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -204,12 +204,12 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha } // If we have a ScreenAndServe process running, forward the iplds to it select { - case screenAndServePayload <- ipldPayload.Value(): + case screenAndServePayload <- ipldPayload: default: } // Forward the payload to the publishAndIndex workers select { - case publishAndIndexPayload <- ipldPayload.Value(): + case publishAndIndexPayload <- ipldPayload: default: } case err := <-sub.Err(): @@ -296,7 +296,7 @@ func (sap *Service) sendResponse(payload shared.StreamedIPLDs) { } for id, sub := range subs { select { - case sub.PayloadChan <- SubscriptionPayload{response.Value(), ""}: + case sub.PayloadChan <- SubscriptionPayload{response, ""}: log.Infof("sending super node payload to subscription %s", id) default: log.Infof("unable to send payload to subscription %s; channel has no receiver", id) @@ -392,7 +392,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, params shared.Subscrip continue } select { - case sub.PayloadChan <- SubscriptionPayload{backFillIplds.Value(), ""}: + case sub.PayloadChan <- SubscriptionPayload{backFillIplds, ""}: log.Infof("sending super node historical data payload to subscription %s", id) default: log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id) diff --git a/pkg/super_node/shared/types.go b/pkg/super_node/shared/types.go index 5045215e..00719dcd 100644 --- a/pkg/super_node/shared/types.go +++ b/pkg/super_node/shared/types.go @@ -19,12 +19,8 @@ package shared // These types serve as very loose wrappers around a generic underlying interface{} type RawChainData interface{} -// The concrete type underneath StreamedIPLDs can be a pointer only if the Value() method returns a copy of the values -// stored at that memory location and not a copy of the pointer itself. -// We want to avoid sending a pointer to publishAndIndex and screenAndServe channels; sharing memory across these processes -type StreamedIPLDs interface { - Value() StreamedIPLDs -} +// The concrete type underneath StreamedIPLDs should not be a pointer +type StreamedIPLDs interface{} type CIDsForIndexing interface{} @@ -32,12 +28,8 @@ type CIDsForFetching interface{} type FetchedIPLDs interface{} -// The concrete type underneath StreamedIPLDs can be a pointer only if the Value() method returns a copy of the values -// stored at that memory location and not a copy of the pointer itself. -// We want to avoid sending a pointer to subscription channels; sharing memory across all subscriptions -type ServerResponse interface { - Value() ServerResponse -} +// The concrete type underneath StreamedIPLDs should not be a pointer +type ServerResponse interface{} type Gap struct { Start uint64