diff --git a/pkg/super_node/btc/converter.go b/pkg/super_node/btc/converter.go
index 61fded86..7a622149 100644
--- a/pkg/super_node/btc/converter.go
+++ b/pkg/super_node/btc/converter.go
@@ -26,7 +26,7 @@ import (
)
// PayloadConverter satisfies the PayloadConverter interface for bitcoin
-type PayloadConverter struct{
+type PayloadConverter struct {
chainConfig *chaincfg.Params
}
diff --git a/pkg/super_node/btc/filterer.go b/pkg/super_node/btc/filterer.go
index 8dd3c1ae..8897734c 100644
--- a/pkg/super_node/btc/filterer.go
+++ b/pkg/super_node/btc/filterer.go
@@ -15,3 +15,84 @@
// along with this program. If not, see .
package btc
+
+import (
+ "bytes"
+ "fmt"
+ "math/big"
+
+ "github.com/btcsuite/btcutil"
+
+ "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
+)
+
+// ResponseFilterer satisfies the ResponseFilterer interface for ethereum
+type ResponseFilterer struct{}
+
+// NewResponseFilterer creates a new Filterer satisfying the ResponseFilterer interface
+func NewResponseFilterer() *ResponseFilterer {
+ return &ResponseFilterer{}
+}
+
+// Filter is used to filter through eth data to extract and package requested data into a Payload
+func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload shared.StreamedIPLDs) (shared.ServerResponse, error) {
+ btcFilters, ok := filter.(*SubscriptionSettings)
+ if !ok {
+ return StreamResponse{}, fmt.Errorf("eth filterer expected filter type %T got %T", &SubscriptionSettings{}, filter)
+ }
+ btcPayload, ok := payload.(IPLDPayload)
+ if !ok {
+ return StreamResponse{}, fmt.Errorf("eth filterer expected payload type %T got %T", IPLDPayload{}, payload)
+ }
+ height := int64(btcPayload.Height)
+ if checkRange(btcFilters.Start.Int64(), btcFilters.End.Int64(), height) {
+ response := new(StreamResponse)
+ if err := s.filterHeaders(btcFilters.HeaderFilter, response, btcPayload); err != nil {
+ return StreamResponse{}, err
+ }
+ if err := s.filterTransactions(btcFilters.TxFilter, response, btcPayload); err != nil {
+ return StreamResponse{}, err
+ }
+ response.BlockNumber = big.NewInt(height)
+ return *response, nil
+ }
+ return StreamResponse{}, nil
+}
+
+func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *StreamResponse, payload IPLDPayload) error {
+ if !headerFilter.Off {
+ headerBuffer := new(bytes.Buffer)
+ if err := payload.Header.Serialize(headerBuffer); err != nil {
+ return err
+ }
+ response.SerializedHeaders = append(response.SerializedHeaders, headerBuffer.Bytes())
+ }
+ return nil
+}
+
+func checkRange(start, end, actual int64) bool {
+ if (end <= 0 || end >= actual) && start <= actual {
+ return true
+ }
+ return false
+}
+
+func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *StreamResponse, payload IPLDPayload) error {
+ if !trxFilter.Off {
+ for _, trx := range payload.Txs {
+ if checkTransaction(trx, trxFilter) {
+ trxBuffer := new(bytes.Buffer)
+ if err := trx.MsgTx().Serialize(trxBuffer); err != nil {
+ return err
+ }
+ response.SerializedTxs = append(response.SerializedTxs, trxBuffer.Bytes())
+ }
+ }
+ }
+ return nil
+}
+
+// checkTransaction returns true if the provided transaction has a hit on the filter
+func checkTransaction(trx *btcutil.Tx, txFilter TxFilter) bool {
+ panic("implement me")
+}
diff --git a/pkg/super_node/btc/retriever.go b/pkg/super_node/btc/retriever.go
index 8dd3c1ae..6a285d91 100644
--- a/pkg/super_node/btc/retriever.go
+++ b/pkg/super_node/btc/retriever.go
@@ -15,3 +15,213 @@
// along with this program. If not, see .
package btc
+
+import (
+ "fmt"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/jmoiron/sqlx"
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vulcanize/vulcanizedb/pkg/eth/datastore/postgres"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
+)
+
+// CIDRetriever satisfies the CIDRetriever interface for bitcoin
+type CIDRetriever struct {
+ db *postgres.DB
+}
+
+// NewCIDRetriever returns a pointer to a new CIDRetriever which supports the CIDRetriever interface
+func NewCIDRetriever(db *postgres.DB) *CIDRetriever {
+ return &CIDRetriever{
+ db: db,
+ }
+}
+
+// RetrieveFirstBlockNumber is used to retrieve the first block number in the db
+func (ecr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) {
+ var blockNumber int64
+ err := ecr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number ASC LIMIT 1")
+ return blockNumber, err
+}
+
+// RetrieveLastBlockNumber is used to retrieve the latest block number in the db
+func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) {
+ var blockNumber int64
+ err := ecr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number DESC LIMIT 1 ")
+ return blockNumber, err
+}
+
+// Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters
+func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) (shared.CIDsForFetching, bool, error) {
+ streamFilter, ok := filter.(*SubscriptionSettings)
+ if !ok {
+ return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &SubscriptionSettings{}, filter)
+ }
+ log.Debug("retrieving cids")
+ tx, err := ecr.db.Beginx()
+ if err != nil {
+ return nil, true, err
+ }
+
+ cw := new(CIDWrapper)
+ cw.BlockNumber = big.NewInt(blockNumber)
+ // Retrieve cached header CIDs
+ if !streamFilter.HeaderFilter.Off {
+ cw.Headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber)
+ if err != nil {
+ if err := tx.Rollback(); err != nil {
+ log.Error(err)
+ }
+ log.Error("header cid retrieval error")
+ return nil, true, err
+ }
+ }
+ // Retrieve cached trx CIDs
+ if !streamFilter.TxFilter.Off {
+ cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, blockNumber)
+ if err != nil {
+ if err := tx.Rollback(); err != nil {
+ log.Error(err)
+ }
+ log.Error("transaction cid retrieval error")
+ return nil, true, err
+ }
+ }
+ trxIds := make([]int64, 0, len(cw.Transactions))
+ for _, tx := range cw.Transactions {
+ trxIds = append(trxIds, tx.ID)
+ }
+ return cw, empty(cw), tx.Commit()
+}
+
+func empty(cidWrapper *CIDWrapper) bool {
+ if len(cidWrapper.Transactions) > 0 || len(cidWrapper.Headers) > 0 {
+ return false
+ }
+ return true
+}
+
+// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight
+func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) {
+ log.Debug("retrieving header cids for block ", blockNumber)
+ headers := make([]HeaderModel, 0)
+ pgStr := `SELECT * FROM btc.header_cids
+ WHERE block_number = $1`
+ return headers, tx.Select(&headers, pgStr, blockNumber)
+}
+
+// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
+// also returns the ids for the returned transaction cids
+func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, blockNumber int64) ([]TxModel, error) {
+ log.Debug("retrieving transaction cids for block ", blockNumber)
+ args := make([]interface{}, 0, 3)
+ results := make([]TxModel, 0)
+ pgStr := `SELECT transaction_cids.id, transaction_cids.header_id,
+ transaction_cids.tx_hash, transaction_cids.cid,
+ transaction_cids.dst, transaction_cids.src, transaction_cids.index
+ FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id)
+ WHERE header_cids.block_number = $1`
+ args = append(args, blockNumber)
+ return results, tx.Select(&results, pgStr, args...)
+}
+
+// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db
+func (ecr *CIDRetriever) RetrieveGapsInData() ([]shared.Gap, error) {
+ pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM btc.header_cids
+ LEFT JOIN btc.header_cids r on btc.header_cids.block_number = r.block_number - 1
+ LEFT JOIN btc.header_cids fr on btc.header_cids.block_number < fr.block_number
+ WHERE r.block_number is NULL and fr.block_number IS NOT NULL
+ GROUP BY header_cids.block_number, r.block_number`
+ results := make([]struct {
+ Start uint64 `db:"start"`
+ Stop uint64 `db:"stop"`
+ }, 0)
+ err := ecr.db.Select(&results, pgStr)
+ if err != nil {
+ return nil, err
+ }
+ gaps := make([]shared.Gap, len(results))
+ for i, res := range results {
+ gaps[i] = shared.Gap{
+ Start: res.Start,
+ Stop: res.Stop,
+ }
+ }
+ return gaps, nil
+}
+
+// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
+func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []TxModel, error) {
+ log.Debug("retrieving block cids for block hash ", blockHash.String())
+ tx, err := ecr.db.Beginx()
+ if err != nil {
+ return HeaderModel{}, nil, err
+ }
+ headerCID, err := ecr.RetrieveHeaderCIDByHash(tx, blockHash)
+ if err != nil {
+ if err := tx.Rollback(); err != nil {
+ log.Error(err)
+ }
+ log.Error("header cid retrieval error")
+ return HeaderModel{}, nil, err
+ }
+ txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID)
+ if err != nil {
+ if err := tx.Rollback(); err != nil {
+ log.Error(err)
+ }
+ log.Error("tx cid retrieval error")
+ return HeaderModel{}, nil, err
+ }
+ return headerCID, txCIDs, tx.Commit()
+}
+
+// RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number
+func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []TxModel, error) {
+ log.Debug("retrieving block cids for block number ", blockNumber)
+ tx, err := ecr.db.Beginx()
+ if err != nil {
+ return HeaderModel{}, nil, err
+ }
+ headerCID, err := ecr.RetrieveHeaderCIDs(tx, blockNumber)
+ if err != nil {
+ if err := tx.Rollback(); err != nil {
+ log.Error(err)
+ }
+ log.Error("header cid retrieval error")
+ return HeaderModel{}, nil, err
+ }
+ if len(headerCID) < 1 {
+ return HeaderModel{}, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber)
+ }
+ txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID)
+ if err != nil {
+ if err := tx.Rollback(); err != nil {
+ log.Error(err)
+ }
+ log.Error("tx cid retrieval error")
+ return HeaderModel{}, nil, err
+ }
+ return headerCID[0], txCIDs, tx.Commit()
+}
+
+// RetrieveHeaderCIDByHash returns the header for the given block hash
+func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) {
+ log.Debug("retrieving header cids for block hash ", blockHash.String())
+ pgStr := `SELECT * FROM btc.header_cids
+ WHERE block_hash = $1`
+ var headerCID HeaderModel
+ return headerCID, tx.Get(&headerCID, pgStr, blockHash.String())
+}
+
+// RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id
+func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) {
+ log.Debug("retrieving tx cids for block id ", headerID)
+ pgStr := `SELECT * FROM btc.transaction_cids
+ WHERE header_id = $1`
+ var txCIDs []TxModel
+ return txCIDs, tx.Select(&txCIDs, pgStr, headerID)
+}
diff --git a/pkg/super_node/btc/subscription_config.go b/pkg/super_node/btc/subscription_config.go
index 8e4a59b2..f95e3036 100644
--- a/pkg/super_node/btc/subscription_config.go
+++ b/pkg/super_node/btc/subscription_config.go
@@ -64,30 +64,36 @@ type HeaderFilter struct {
// TxFilter contains filter settings for txs
type TxFilter struct {
- Off bool
- Index int64 // allow filtering by index so that we can filter for only coinbase transactions (index 0) if we want to
- HasWitness bool
- WitnessHashes []string
+ Off bool
+ // Top level trx filters
+ Index int64 // allow filtering by index so that we can filter for only coinbase transactions (index 0) if we want to
+ Segwit bool // allow filtering for segwit trxs
+ WitnessHashes []string // allow filtering for specific witness hashes
+ // TODO: trx input filters
+ // TODO: trx output filters
}
// Init is used to initialize a EthSubscription struct with env variables
func NewEthSubscriptionConfig() (*SubscriptionSettings, error) {
sc := new(SubscriptionSettings)
// Below default to false, which means we do not backfill by default
- sc.BackFill = viper.GetBool("superNode.ethSubscription.historicalData")
- sc.BackFillOnly = viper.GetBool("superNode.ethSubscription.historicalDataOnly")
+ sc.BackFill = viper.GetBool("superNode.btcSubscription.historicalData")
+ sc.BackFillOnly = viper.GetBool("superNode.btcSubscription.historicalDataOnly")
// Below default to 0
// 0 start means we start at the beginning and 0 end means we continue indefinitely
- sc.Start = big.NewInt(viper.GetInt64("superNode.ethSubscription.startingBlock"))
- sc.End = big.NewInt(viper.GetInt64("superNode.ethSubscription.endingBlock"))
+ sc.Start = big.NewInt(viper.GetInt64("superNode.btcSubscription.startingBlock"))
+ sc.End = big.NewInt(viper.GetInt64("superNode.btcSubscription.endingBlock"))
// Below default to false, which means we get all headers and no uncles by default
sc.HeaderFilter = HeaderFilter{
- Off: viper.GetBool("superNode.ethSubscription.off"),
+ Off: viper.GetBool("superNode.btcSubscription.headerFilter.off"),
}
// Below defaults to false and two slices of length 0
// Which means we get all transactions by default
sc.TxFilter = TxFilter{
- Off: viper.GetBool("superNode.ethSubscription.txFilter.off"),
+ Off: viper.GetBool("superNode.btcSubscription.txFilter.off"),
+ Index: viper.GetInt64("superNode.btcSubscription.txFilter.index"),
+ Segwit: viper.GetBool("superNode.btcSubscription.txFilter.segwit"),
+ WitnessHashes: viper.GetStringSlice("superNode.btcSubscription.txFilter.witnessHashes"),
}
return sc, nil
}
diff --git a/pkg/super_node/btc/types.go b/pkg/super_node/btc/types.go
index e1d6c629..2f58fd82 100644
--- a/pkg/super_node/btc/types.go
+++ b/pkg/super_node/btc/types.go
@@ -20,8 +20,6 @@ import (
"encoding/json"
"math/big"
- "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
-
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/ipfs/go-block-format"
@@ -42,10 +40,6 @@ type IPLDPayload struct {
TxMetaData []TxModelWithInsAndOuts
}
-func (ip IPLDPayload) Value() shared.StreamedIPLDs {
- return ip
-}
-
// CIDPayload is a struct to hold all the CIDs and their associated meta data for indexing in Postgres
// Returned by IPLDPublisher
// Passed to CIDIndexer
@@ -72,10 +66,10 @@ type IPLDWrapper struct {
Transactions []blocks.Block
}
-// StreamPayload holds the data streamed from the super node eth service to the requesting clients
+// StreamResponse holds the data streamed from the super node eth service to the requesting clients
// Returned by IPLDResolver and ResponseFilterer
// Passed to client subscriptions
-type StreamPayload struct {
+type StreamResponse struct {
BlockNumber *big.Int `json:"blockNumber"`
SerializedHeaders [][]byte `json:"headerBytes"`
SerializedTxs [][]byte `json:"transactionBytes"`
@@ -84,20 +78,20 @@ type StreamPayload struct {
err error
}
-func (sd *StreamPayload) ensureEncoded() {
- if sd.encoded == nil && sd.err == nil {
- sd.encoded, sd.err = json.Marshal(sd)
+func (sr *StreamResponse) ensureEncoded() {
+ if sr.encoded == nil && sr.err == nil {
+ sr.encoded, sr.err = json.Marshal(sr)
}
}
// Length to implement Encoder interface for StateDiff
-func (sd *StreamPayload) Length() int {
- sd.ensureEncoded()
- return len(sd.encoded)
+func (sr *StreamResponse) Length() int {
+ sr.ensureEncoded()
+ return len(sr.encoded)
}
// Encode to implement Encoder interface for StateDiff
-func (sd *StreamPayload) Encode() ([]byte, error) {
- sd.ensureEncoded()
- return sd.encoded, sd.err
+func (sr *StreamResponse) Encode() ([]byte, error) {
+ sr.ensureEncoded()
+ return sr.encoded, sr.err
}
diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go
index b73cb34b..9500d717 100644
--- a/pkg/super_node/constructors.go
+++ b/pkg/super_node/constructors.go
@@ -18,6 +18,7 @@ package super_node
import (
"fmt"
+
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/rpcclient"
diff --git a/pkg/super_node/eth/filterer.go b/pkg/super_node/eth/filterer.go
index 3fb23016..639cde50 100644
--- a/pkg/super_node/eth/filterer.go
+++ b/pkg/super_node/eth/filterer.go
@@ -106,7 +106,7 @@ func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *Stre
trxHashes := make([]common.Hash, 0, len(payload.Block.Body().Transactions))
if !trxFilter.Off {
for i, trx := range payload.Block.Body().Transactions {
- if checkTransactions(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) {
+ if checkTransactionAddrs(trxFilter.Src, trxFilter.Dst, payload.TxMetaData[i].Src, payload.TxMetaData[i].Dst) {
trxBuffer := new(bytes.Buffer)
if err := trx.EncodeRLP(trxBuffer); err != nil {
return nil, err
@@ -119,7 +119,8 @@ func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *Stre
return trxHashes, nil
}
-func checkTransactions(wantedSrc, wantedDst []string, actualSrc, actualDst string) bool {
+// checkTransactionAddrs returns true if either the transaction src and dst are one of the wanted src and dst addresses
+func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst string) bool {
// If we aren't filtering for any addresses, every transaction is a go
if len(wantedDst) == 0 && len(wantedSrc) == 0 {
return true
@@ -185,21 +186,16 @@ func checkReceipts(rct *types.Receipt, wantedTopics, actualTopics [][]string, wa
return false
}
+// filterMatch returns true if the actualTopics conform to the wantedTopics filter
func filterMatch(wantedTopics, actualTopics [][]string) bool {
- // actualTopics should always be length 4, members could be nil slices though
- lenWantedTopics := len(wantedTopics)
+ // actualTopics should always be length 4, but the members can be nil slices
matches := 0
for i, actualTopicSet := range actualTopics {
- if i < lenWantedTopics {
+ if i < len(wantedTopics) && len(wantedTopics[i]) > 0 {
// If we have topics in this filter slot, count as a match if one of the topics matches
- if len(wantedTopics[i]) > 0 {
- matches += slicesShareString(actualTopicSet, wantedTopics[i])
- } else {
- // Filter slot is empty, not matching any topics at this slot => counts as a match
- matches++
- }
+ matches += slicesShareString(actualTopicSet, wantedTopics[i])
} else {
- // Filter slot doesn't exist, not matching any topics at this slot => count as a match
+ // Filter slot is either empty or doesn't exist => not matching any topics at this slot => counts as a match
matches++
}
}
diff --git a/pkg/super_node/eth/subscription_config.go b/pkg/super_node/eth/subscription_config.go
index 368d461a..f9c77496 100644
--- a/pkg/super_node/eth/subscription_config.go
+++ b/pkg/super_node/eth/subscription_config.go
@@ -86,8 +86,8 @@ func NewEthSubscriptionConfig() (*SubscriptionSettings, error) {
sc.End = big.NewInt(viper.GetInt64("superNode.ethSubscription.endingBlock"))
// Below default to false, which means we get all headers and no uncles by default
sc.HeaderFilter = HeaderFilter{
- Off: viper.GetBool("superNode.ethSubscription.off"),
- Uncles: viper.GetBool("superNode.ethSubscription.uncles"),
+ Off: viper.GetBool("superNode.ethSubscription.headerFilter.off"),
+ Uncles: viper.GetBool("superNode.ethSubscription.headerFilter.uncles"),
}
// Below defaults to false and two slices of length 0
// Which means we get all transactions by default
diff --git a/pkg/super_node/eth/types.go b/pkg/super_node/eth/types.go
index 81729381..e30d8ce4 100644
--- a/pkg/super_node/eth/types.go
+++ b/pkg/super_node/eth/types.go
@@ -20,8 +20,6 @@ import (
"encoding/json"
"math/big"
- "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
-
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ipfs/go-block-format"
@@ -40,10 +38,6 @@ type IPLDPayload struct {
StorageNodes map[common.Hash][]TrieNode
}
-func (ip IPLDPayload) Value() shared.StreamedIPLDs {
- return ip
-}
-
// Trie struct used to flag node as leaf or not
type TrieNode struct {
Key common.Hash
@@ -105,24 +99,20 @@ type StreamResponse struct {
err error
}
-func (sr StreamResponse) Value() shared.ServerResponse {
- return sr
-}
-
-func (sd *StreamResponse) ensureEncoded() {
- if sd.encoded == nil && sd.err == nil {
- sd.encoded, sd.err = json.Marshal(sd)
+func (sr *StreamResponse) ensureEncoded() {
+ if sr.encoded == nil && sr.err == nil {
+ sr.encoded, sr.err = json.Marshal(sr)
}
}
// Length to implement Encoder interface for StateDiff
-func (sd *StreamResponse) Length() int {
- sd.ensureEncoded()
- return len(sd.encoded)
+func (sr *StreamResponse) Length() int {
+ sr.ensureEncoded()
+ return len(sr.encoded)
}
// Encode to implement Encoder interface for StateDiff
-func (sd *StreamResponse) Encode() ([]byte, error) {
- sd.ensureEncoded()
- return sd.encoded, sd.err
+func (sr *StreamResponse) Encode() ([]byte, error) {
+ sr.ensureEncoded()
+ return sr.encoded, sr.err
}
diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go
index c650533f..31756092 100644
--- a/pkg/super_node/service.go
+++ b/pkg/super_node/service.go
@@ -204,12 +204,12 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha
}
// If we have a ScreenAndServe process running, forward the iplds to it
select {
- case screenAndServePayload <- ipldPayload.Value():
+ case screenAndServePayload <- ipldPayload:
default:
}
// Forward the payload to the publishAndIndex workers
select {
- case publishAndIndexPayload <- ipldPayload.Value():
+ case publishAndIndexPayload <- ipldPayload:
default:
}
case err := <-sub.Err():
@@ -296,7 +296,7 @@ func (sap *Service) sendResponse(payload shared.StreamedIPLDs) {
}
for id, sub := range subs {
select {
- case sub.PayloadChan <- SubscriptionPayload{response.Value(), ""}:
+ case sub.PayloadChan <- SubscriptionPayload{response, ""}:
log.Infof("sending super node payload to subscription %s", id)
default:
log.Infof("unable to send payload to subscription %s; channel has no receiver", id)
@@ -392,7 +392,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, params shared.Subscrip
continue
}
select {
- case sub.PayloadChan <- SubscriptionPayload{backFillIplds.Value(), ""}:
+ case sub.PayloadChan <- SubscriptionPayload{backFillIplds, ""}:
log.Infof("sending super node historical data payload to subscription %s", id)
default:
log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id)
diff --git a/pkg/super_node/shared/types.go b/pkg/super_node/shared/types.go
index 5045215e..00719dcd 100644
--- a/pkg/super_node/shared/types.go
+++ b/pkg/super_node/shared/types.go
@@ -19,12 +19,8 @@ package shared
// These types serve as very loose wrappers around a generic underlying interface{}
type RawChainData interface{}
-// The concrete type underneath StreamedIPLDs can be a pointer only if the Value() method returns a copy of the values
-// stored at that memory location and not a copy of the pointer itself.
-// We want to avoid sending a pointer to publishAndIndex and screenAndServe channels; sharing memory across these processes
-type StreamedIPLDs interface {
- Value() StreamedIPLDs
-}
+// The concrete type underneath StreamedIPLDs should not be a pointer
+type StreamedIPLDs interface{}
type CIDsForIndexing interface{}
@@ -32,12 +28,8 @@ type CIDsForFetching interface{}
type FetchedIPLDs interface{}
-// The concrete type underneath StreamedIPLDs can be a pointer only if the Value() method returns a copy of the values
-// stored at that memory location and not a copy of the pointer itself.
-// We want to avoid sending a pointer to subscription channels; sharing memory across all subscriptions
-type ServerResponse interface {
- Value() ServerResponse
-}
+// The concrete type underneath StreamedIPLDs should not be a pointer
+type ServerResponse interface{}
type Gap struct {
Start uint64