diff --git a/libraries/shared/streamer/super_node_streamer.go b/libraries/shared/streamer/super_node_streamer.go index a6bbf7b8..26c9479a 100644 --- a/libraries/shared/streamer/super_node_streamer.go +++ b/libraries/shared/streamer/super_node_streamer.go @@ -25,12 +25,7 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) -// ISuperNodeStreamer is the interface for streaming SuperNodePayloads from a vulcanizeDB super node -type ISuperNodeStreamer interface { - Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error) -} - -// SuperNodeStreamer is the underlying struct for the ISuperNodeStreamer interface +// SuperNodeStreamer is the underlying struct for the shared.SuperNodeStreamer interface type SuperNodeStreamer struct { Client core.RPCClient } diff --git a/pkg/super_node/btc/converter_test.go b/pkg/super_node/btc/converter_test.go index c5ad628d..142f511f 100644 --- a/pkg/super_node/btc/converter_test.go +++ b/pkg/super_node/btc/converter_test.go @@ -34,7 +34,7 @@ var _ = Describe("Converter", func() { convertedPayload, ok := payload.(btc.IPLDPayload) Expect(ok).To(BeTrue()) Expect(convertedPayload).To(Equal(mocks.MockIPLDPayload)) - Expect(convertedPayload.Height).To(Equal(mocks.MockBlockHeight)) + Expect(convertedPayload.BlockHeight).To(Equal(mocks.MockBlockHeight)) Expect(convertedPayload.Header).To(Equal(&mocks.MockBlock.Header)) Expect(convertedPayload.Txs).To(Equal(mocks.MockTransactions)) Expect(convertedPayload.TxMetaData).To(Equal(mocks.MockTxsMetaData)) diff --git a/pkg/super_node/btc/filterer.go b/pkg/super_node/btc/filterer.go index b9b6a78d..e37d1d80 100644 --- a/pkg/super_node/btc/filterer.go +++ b/pkg/super_node/btc/filterer.go @@ -42,7 +42,7 @@ func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload sh if !ok { return StreamResponse{}, fmt.Errorf("btc filterer expected payload type %T got %T", IPLDPayload{}, payload) } - height := int64(btcPayload.Height) + height := int64(btcPayload.BlockPayload.BlockHeight) if checkRange(btcFilters.Start.Int64(), btcFilters.End.Int64(), height) { response := new(StreamResponse) if err := s.filterHeaders(btcFilters.HeaderFilter, response, btcPayload); err != nil { diff --git a/pkg/super_node/btc/http_streamer.go b/pkg/super_node/btc/http_streamer.go index 67dd3261..a75caf91 100644 --- a/pkg/super_node/btc/http_streamer.go +++ b/pkg/super_node/btc/http_streamer.go @@ -75,9 +75,9 @@ func (ps *HTTPPayloadStreamer) Stream(payloadChan chan shared.RawChainData) (sha continue } payloadChan <- BlockPayload{ - Header: &block.Header, - Height: height, - Txs: msgTxsToUtilTxs(block.Transactions), + Header: &block.Header, + BlockHeight: height, + Txs: msgTxsToUtilTxs(block.Transactions), } default: } diff --git a/pkg/super_node/btc/mocks/test_data.go b/pkg/super_node/btc/mocks/test_data.go index 8e3de6be..72193b11 100644 --- a/pkg/super_node/btc/mocks/test_data.go +++ b/pkg/super_node/btc/mocks/test_data.go @@ -229,9 +229,9 @@ var ( btcutil.NewTx(MockBlock.Transactions[2]), } MockBlockPayload = btc.BlockPayload{ - Header: &MockBlock.Header, - Txs: MockTransactions, - Height: MockBlockHeight, + Header: &MockBlock.Header, + Txs: MockTransactions, + BlockHeight: MockBlockHeight, } sClass1, addresses1, numOfSigs1, _ = txscript.ExtractPkScriptAddrs([]byte{ 0x41, // OP_DATA_65 diff --git a/pkg/super_node/btc/payload_fetcher.go b/pkg/super_node/btc/payload_fetcher.go index d8290d10..3582bd90 100644 --- a/pkg/super_node/btc/payload_fetcher.go +++ b/pkg/super_node/btc/payload_fetcher.go @@ -55,9 +55,9 @@ func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChain return nil, err } blockPayloads[i] = BlockPayload{ - Height: int64(height), - Header: &block.Header, - Txs: msgTxsToUtilTxs(block.Transactions), + BlockHeight: int64(height), + Header: &block.Header, + Txs: msgTxsToUtilTxs(block.Transactions), } } return blockPayloads, nil diff --git a/pkg/super_node/btc/publisher.go b/pkg/super_node/btc/publisher.go index f010217c..90159bd6 100644 --- a/pkg/super_node/btc/publisher.go +++ b/pkg/super_node/btc/publisher.go @@ -61,7 +61,7 @@ func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForI header := HeaderModel{ CID: headerCid, ParentHash: ipldPayload.Header.PrevBlock.String(), - BlockNumber: strconv.Itoa(int(ipldPayload.Height)), + BlockNumber: strconv.Itoa(int(ipldPayload.BlockPayload.BlockHeight)), BlockHash: ipldPayload.Header.BlockHash().String(), Timestamp: ipldPayload.Header.Timestamp.UnixNano(), Bits: ipldPayload.Header.Bits, diff --git a/pkg/super_node/btc/streamer.go b/pkg/super_node/btc/streamer.go index 8a212e68..9c5f4839 100644 --- a/pkg/super_node/btc/streamer.go +++ b/pkg/super_node/btc/streamer.go @@ -49,9 +49,9 @@ func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared. // Notification handler for block connections, forwards new block data to the payloadChan OnFilteredBlockConnected: func(height int32, header *wire.BlockHeader, txs []*btcutil.Tx) { payloadChan <- BlockPayload{ - Height: int64(height), - Header: header, - Txs: txs, + BlockHeight: int64(height), + Header: header, + Txs: txs, } }, } diff --git a/pkg/super_node/btc/types.go b/pkg/super_node/btc/types.go index ece70479..966a954d 100644 --- a/pkg/super_node/btc/types.go +++ b/pkg/super_node/btc/types.go @@ -27,9 +27,9 @@ import ( // BlockPayload packages the block and tx data received from block connection notifications type BlockPayload struct { - Height int64 - Header *wire.BlockHeader - Txs []*btcutil.Tx + BlockHeight int64 + Header *wire.BlockHeader + Txs []*btcutil.Tx } // IPLDPayload is a custom type which packages raw BTC data for publishing to IPFS and filtering to subscribers @@ -40,6 +40,11 @@ type IPLDPayload struct { TxMetaData []TxModelWithInsAndOuts } +// Height satisfies the StreamedIPLDs interface +func (i IPLDPayload) Height() int64 { + return i.BlockPayload.BlockHeight +} + // CIDPayload is a struct to hold all the CIDs and their associated meta data for indexing in Postgres // Returned by IPLDPublisher // Passed to CIDIndexer @@ -78,6 +83,11 @@ type StreamResponse struct { err error } +// Height satisfies the ServerResponse interface +func (sr StreamResponse) Height() int64 { + return sr.BlockNumber.Int64() +} + func (sr *StreamResponse) ensureEncoded() { if sr.encoded == nil && sr.err == nil { sr.encoded, sr.err = json.Marshal(sr) diff --git a/pkg/super_node/eth/types.go b/pkg/super_node/eth/types.go index e30d8ce4..9b4c8314 100644 --- a/pkg/super_node/eth/types.go +++ b/pkg/super_node/eth/types.go @@ -38,6 +38,11 @@ type IPLDPayload struct { StorageNodes map[common.Hash][]TrieNode } +// Height satisfies the StreamedIPLDs interface +func (i IPLDPayload) Height() int64 { + return i.Block.Number().Int64() +} + // Trie struct used to flag node as leaf or not type TrieNode struct { Key common.Hash @@ -99,6 +104,11 @@ type StreamResponse struct { err error } +// Height satisfies the ServerResponse interface +func (sr StreamResponse) Height() int64 { + return sr.BlockNumber.Int64() +} + func (sr *StreamResponse) ensureEncoded() { if sr.encoded == nil && sr.err == nil { sr.encoded, sr.err = json.Marshal(sr) diff --git a/pkg/super_node/helpers.go b/pkg/super_node/helpers.go index 770f0fae..4a4b077b 100644 --- a/pkg/super_node/helpers.go +++ b/pkg/super_node/helpers.go @@ -21,7 +21,7 @@ import log "github.com/sirupsen/logrus" func sendNonBlockingErr(sub Subscription, err error) { log.Error(err) select { - case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: err.Error(), Msg: ""}: + case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: err.Error(), Flag: EmptyFlag}: default: log.Infof("unable to send error to subscription %s", sub.ID) } diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index 2ddb7c3e..3ef2002c 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -272,6 +272,12 @@ func (sap *Service) filterAndServe(payload shared.StreamedIPLDs) { sap.closeType(ty) continue } + if subConfig.EndingBlock().Int64() > 0 && subConfig.EndingBlock().Int64() < payload.Height() { + // We are not out of range for this subscription type + // close it, and continue to the next + sap.closeType(ty) + continue + } response, err := sap.Filterer.Filter(subConfig, payload) if err != nil { log.Error(err) @@ -280,7 +286,7 @@ func (sap *Service) filterAndServe(payload shared.StreamedIPLDs) { } for id, sub := range subs { select { - case sub.PayloadChan <- SubscriptionPayload{Data: response, Err: "", Msg: ""}: + case sub.PayloadChan <- SubscriptionPayload{Data: response, Err: "", Flag: EmptyFlag}: log.Debugf("sending super node payload to subscription %s", id) default: log.Infof("unable to send payload to subscription %s; channel has no receiver", id) @@ -312,15 +318,6 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha return } subscriptionType := crypto.Keccak256Hash(by) - // If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data - // Otherwise we only filter new data as it is streamed in from the state diffing geth node - if params.HistoricalData() || params.HistoricalDataOnly() { - if err := sap.sendHistoricalData(subscription, id, params); err != nil { - sendNonBlockingErr(subscription, err) - sendNonBlockingQuit(subscription) - return - } - } if !params.HistoricalDataOnly() { // Add subscriber sap.Lock() @@ -331,6 +328,15 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha sap.SubscriptionTypes[subscriptionType] = params sap.Unlock() } + // If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data + // Otherwise we only filter new data as it is streamed in from the state diffing geth node + if params.HistoricalData() || params.HistoricalDataOnly() { + if err := sap.sendHistoricalData(subscription, id, params); err != nil { + sendNonBlockingErr(subscription, err) + sendNonBlockingQuit(subscription) + return + } + } } // sendHistoricalData sends historical data to the requesting subscription @@ -377,7 +383,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share continue } select { - case sub.PayloadChan <- SubscriptionPayload{Data: backFillIplds, Err: "", Msg: ""}: + case sub.PayloadChan <- SubscriptionPayload{Data: backFillIplds, Err: "", Flag: EmptyFlag}: log.Debugf("sending super node historical data payload to subscription %s", id) default: log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id) @@ -385,7 +391,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share } // when we are done backfilling send an empty payload signifying so in the msg select { - case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Msg: "BACKFILL COMPLETE"}: + case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}: log.Debugf("sending backfill completion notice to subscription %s", id) default: log.Infof("unable to send backfill completion notice to subscription %s", id) diff --git a/pkg/super_node/shared/types.go b/pkg/super_node/shared/types.go index 00719dcd..f89a7a74 100644 --- a/pkg/super_node/shared/types.go +++ b/pkg/super_node/shared/types.go @@ -20,7 +20,9 @@ package shared type RawChainData interface{} // The concrete type underneath StreamedIPLDs should not be a pointer -type StreamedIPLDs interface{} +type StreamedIPLDs interface { + Height() int64 +} type CIDsForIndexing interface{} @@ -29,7 +31,9 @@ type CIDsForFetching interface{} type FetchedIPLDs interface{} // The concrete type underneath StreamedIPLDs should not be a pointer -type ServerResponse interface{} +type ServerResponse interface { + Height() int64 +} type Gap struct { Start uint64 diff --git a/pkg/super_node/subscription.go b/pkg/super_node/subscription.go index 8dabf757..fc2cb0ec 100644 --- a/pkg/super_node/subscription.go +++ b/pkg/super_node/subscription.go @@ -17,10 +17,19 @@ package super_node import ( + "errors" + "github.com/ethereum/go-ethereum/rpc" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) +type Flag int32 + +const ( + EmptyFlag Flag = iota + BackFillCompleteFlag +) + // Subscription holds the information for an individual client subscription to the super node type Subscription struct { ID rpc.ID @@ -33,5 +42,19 @@ type Subscription struct { type SubscriptionPayload struct { Data shared.ServerResponse `json:"data"` // e.g. for Ethereum eth.StreamPayload Err string `json:"err"` // field for error - Msg string `json:"msg"` // field for message + Flag Flag `json:"flag"` // field for message +} + +func (sp SubscriptionPayload) Error() error { + if sp.Err == "" { + return nil + } + return errors.New(sp.Err) +} + +func (sp SubscriptionPayload) BackFillComplete() bool { + if sp.Flag == BackFillCompleteFlag { + return true + } + return false }