more explicity payload err and msg

This commit is contained in:
Ian Norden 2020-02-19 16:09:33 -06:00
parent 2fbf97da9d
commit 94aefafd7c
14 changed files with 88 additions and 40 deletions

View File

@ -25,12 +25,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
// ISuperNodeStreamer is the interface for streaming SuperNodePayloads from a vulcanizeDB super node // SuperNodeStreamer is the underlying struct for the shared.SuperNodeStreamer interface
type ISuperNodeStreamer interface {
Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error)
}
// SuperNodeStreamer is the underlying struct for the ISuperNodeStreamer interface
type SuperNodeStreamer struct { type SuperNodeStreamer struct {
Client core.RPCClient Client core.RPCClient
} }

View File

@ -34,7 +34,7 @@ var _ = Describe("Converter", func() {
convertedPayload, ok := payload.(btc.IPLDPayload) convertedPayload, ok := payload.(btc.IPLDPayload)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())
Expect(convertedPayload).To(Equal(mocks.MockIPLDPayload)) Expect(convertedPayload).To(Equal(mocks.MockIPLDPayload))
Expect(convertedPayload.Height).To(Equal(mocks.MockBlockHeight)) Expect(convertedPayload.BlockHeight).To(Equal(mocks.MockBlockHeight))
Expect(convertedPayload.Header).To(Equal(&mocks.MockBlock.Header)) Expect(convertedPayload.Header).To(Equal(&mocks.MockBlock.Header))
Expect(convertedPayload.Txs).To(Equal(mocks.MockTransactions)) Expect(convertedPayload.Txs).To(Equal(mocks.MockTransactions))
Expect(convertedPayload.TxMetaData).To(Equal(mocks.MockTxsMetaData)) Expect(convertedPayload.TxMetaData).To(Equal(mocks.MockTxsMetaData))

View File

@ -42,7 +42,7 @@ func (s *ResponseFilterer) Filter(filter shared.SubscriptionSettings, payload sh
if !ok { if !ok {
return StreamResponse{}, fmt.Errorf("btc filterer expected payload type %T got %T", IPLDPayload{}, payload) return StreamResponse{}, fmt.Errorf("btc filterer expected payload type %T got %T", IPLDPayload{}, payload)
} }
height := int64(btcPayload.Height) height := int64(btcPayload.BlockPayload.BlockHeight)
if checkRange(btcFilters.Start.Int64(), btcFilters.End.Int64(), height) { if checkRange(btcFilters.Start.Int64(), btcFilters.End.Int64(), height) {
response := new(StreamResponse) response := new(StreamResponse)
if err := s.filterHeaders(btcFilters.HeaderFilter, response, btcPayload); err != nil { if err := s.filterHeaders(btcFilters.HeaderFilter, response, btcPayload); err != nil {

View File

@ -76,7 +76,7 @@ func (ps *HTTPPayloadStreamer) Stream(payloadChan chan shared.RawChainData) (sha
} }
payloadChan <- BlockPayload{ payloadChan <- BlockPayload{
Header: &block.Header, Header: &block.Header,
Height: height, BlockHeight: height,
Txs: msgTxsToUtilTxs(block.Transactions), Txs: msgTxsToUtilTxs(block.Transactions),
} }
default: default:

View File

@ -231,7 +231,7 @@ var (
MockBlockPayload = btc.BlockPayload{ MockBlockPayload = btc.BlockPayload{
Header: &MockBlock.Header, Header: &MockBlock.Header,
Txs: MockTransactions, Txs: MockTransactions,
Height: MockBlockHeight, BlockHeight: MockBlockHeight,
} }
sClass1, addresses1, numOfSigs1, _ = txscript.ExtractPkScriptAddrs([]byte{ sClass1, addresses1, numOfSigs1, _ = txscript.ExtractPkScriptAddrs([]byte{
0x41, // OP_DATA_65 0x41, // OP_DATA_65

View File

@ -55,7 +55,7 @@ func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChain
return nil, err return nil, err
} }
blockPayloads[i] = BlockPayload{ blockPayloads[i] = BlockPayload{
Height: int64(height), BlockHeight: int64(height),
Header: &block.Header, Header: &block.Header,
Txs: msgTxsToUtilTxs(block.Transactions), Txs: msgTxsToUtilTxs(block.Transactions),
} }

View File

@ -61,7 +61,7 @@ func (pub *IPLDPublisher) Publish(payload shared.StreamedIPLDs) (shared.CIDsForI
header := HeaderModel{ header := HeaderModel{
CID: headerCid, CID: headerCid,
ParentHash: ipldPayload.Header.PrevBlock.String(), ParentHash: ipldPayload.Header.PrevBlock.String(),
BlockNumber: strconv.Itoa(int(ipldPayload.Height)), BlockNumber: strconv.Itoa(int(ipldPayload.BlockPayload.BlockHeight)),
BlockHash: ipldPayload.Header.BlockHash().String(), BlockHash: ipldPayload.Header.BlockHash().String(),
Timestamp: ipldPayload.Header.Timestamp.UnixNano(), Timestamp: ipldPayload.Header.Timestamp.UnixNano(),
Bits: ipldPayload.Header.Bits, Bits: ipldPayload.Header.Bits,

View File

@ -49,7 +49,7 @@ func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.
// Notification handler for block connections, forwards new block data to the payloadChan // Notification handler for block connections, forwards new block data to the payloadChan
OnFilteredBlockConnected: func(height int32, header *wire.BlockHeader, txs []*btcutil.Tx) { OnFilteredBlockConnected: func(height int32, header *wire.BlockHeader, txs []*btcutil.Tx) {
payloadChan <- BlockPayload{ payloadChan <- BlockPayload{
Height: int64(height), BlockHeight: int64(height),
Header: header, Header: header,
Txs: txs, Txs: txs,
} }

View File

@ -27,7 +27,7 @@ import (
// BlockPayload packages the block and tx data received from block connection notifications // BlockPayload packages the block and tx data received from block connection notifications
type BlockPayload struct { type BlockPayload struct {
Height int64 BlockHeight int64
Header *wire.BlockHeader Header *wire.BlockHeader
Txs []*btcutil.Tx Txs []*btcutil.Tx
} }
@ -40,6 +40,11 @@ type IPLDPayload struct {
TxMetaData []TxModelWithInsAndOuts TxMetaData []TxModelWithInsAndOuts
} }
// Height satisfies the StreamedIPLDs interface
func (i IPLDPayload) Height() int64 {
return i.BlockPayload.BlockHeight
}
// CIDPayload is a struct to hold all the CIDs and their associated meta data for indexing in Postgres // CIDPayload is a struct to hold all the CIDs and their associated meta data for indexing in Postgres
// Returned by IPLDPublisher // Returned by IPLDPublisher
// Passed to CIDIndexer // Passed to CIDIndexer
@ -78,6 +83,11 @@ type StreamResponse struct {
err error err error
} }
// Height satisfies the ServerResponse interface
func (sr StreamResponse) Height() int64 {
return sr.BlockNumber.Int64()
}
func (sr *StreamResponse) ensureEncoded() { func (sr *StreamResponse) ensureEncoded() {
if sr.encoded == nil && sr.err == nil { if sr.encoded == nil && sr.err == nil {
sr.encoded, sr.err = json.Marshal(sr) sr.encoded, sr.err = json.Marshal(sr)

View File

@ -38,6 +38,11 @@ type IPLDPayload struct {
StorageNodes map[common.Hash][]TrieNode StorageNodes map[common.Hash][]TrieNode
} }
// Height satisfies the StreamedIPLDs interface
func (i IPLDPayload) Height() int64 {
return i.Block.Number().Int64()
}
// Trie struct used to flag node as leaf or not // Trie struct used to flag node as leaf or not
type TrieNode struct { type TrieNode struct {
Key common.Hash Key common.Hash
@ -99,6 +104,11 @@ type StreamResponse struct {
err error err error
} }
// Height satisfies the ServerResponse interface
func (sr StreamResponse) Height() int64 {
return sr.BlockNumber.Int64()
}
func (sr *StreamResponse) ensureEncoded() { func (sr *StreamResponse) ensureEncoded() {
if sr.encoded == nil && sr.err == nil { if sr.encoded == nil && sr.err == nil {
sr.encoded, sr.err = json.Marshal(sr) sr.encoded, sr.err = json.Marshal(sr)

View File

@ -21,7 +21,7 @@ import log "github.com/sirupsen/logrus"
func sendNonBlockingErr(sub Subscription, err error) { func sendNonBlockingErr(sub Subscription, err error) {
log.Error(err) log.Error(err)
select { select {
case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: err.Error(), Msg: ""}: case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: err.Error(), Flag: EmptyFlag}:
default: default:
log.Infof("unable to send error to subscription %s", sub.ID) log.Infof("unable to send error to subscription %s", sub.ID)
} }

View File

@ -272,6 +272,12 @@ func (sap *Service) filterAndServe(payload shared.StreamedIPLDs) {
sap.closeType(ty) sap.closeType(ty)
continue continue
} }
if subConfig.EndingBlock().Int64() > 0 && subConfig.EndingBlock().Int64() < payload.Height() {
// We are not out of range for this subscription type
// close it, and continue to the next
sap.closeType(ty)
continue
}
response, err := sap.Filterer.Filter(subConfig, payload) response, err := sap.Filterer.Filter(subConfig, payload)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
@ -280,7 +286,7 @@ func (sap *Service) filterAndServe(payload shared.StreamedIPLDs) {
} }
for id, sub := range subs { for id, sub := range subs {
select { select {
case sub.PayloadChan <- SubscriptionPayload{Data: response, Err: "", Msg: ""}: case sub.PayloadChan <- SubscriptionPayload{Data: response, Err: "", Flag: EmptyFlag}:
log.Debugf("sending super node payload to subscription %s", id) log.Debugf("sending super node payload to subscription %s", id)
default: default:
log.Infof("unable to send payload to subscription %s; channel has no receiver", id) log.Infof("unable to send payload to subscription %s; channel has no receiver", id)
@ -312,15 +318,6 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha
return return
} }
subscriptionType := crypto.Keccak256Hash(by) subscriptionType := crypto.Keccak256Hash(by)
// If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data
// Otherwise we only filter new data as it is streamed in from the state diffing geth node
if params.HistoricalData() || params.HistoricalDataOnly() {
if err := sap.sendHistoricalData(subscription, id, params); err != nil {
sendNonBlockingErr(subscription, err)
sendNonBlockingQuit(subscription)
return
}
}
if !params.HistoricalDataOnly() { if !params.HistoricalDataOnly() {
// Add subscriber // Add subscriber
sap.Lock() sap.Lock()
@ -331,6 +328,15 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha
sap.SubscriptionTypes[subscriptionType] = params sap.SubscriptionTypes[subscriptionType] = params
sap.Unlock() sap.Unlock()
} }
// If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data
// Otherwise we only filter new data as it is streamed in from the state diffing geth node
if params.HistoricalData() || params.HistoricalDataOnly() {
if err := sap.sendHistoricalData(subscription, id, params); err != nil {
sendNonBlockingErr(subscription, err)
sendNonBlockingQuit(subscription)
return
}
}
} }
// sendHistoricalData sends historical data to the requesting subscription // sendHistoricalData sends historical data to the requesting subscription
@ -377,7 +383,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
continue continue
} }
select { select {
case sub.PayloadChan <- SubscriptionPayload{Data: backFillIplds, Err: "", Msg: ""}: case sub.PayloadChan <- SubscriptionPayload{Data: backFillIplds, Err: "", Flag: EmptyFlag}:
log.Debugf("sending super node historical data payload to subscription %s", id) log.Debugf("sending super node historical data payload to subscription %s", id)
default: default:
log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id) log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id)
@ -385,7 +391,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
} }
// when we are done backfilling send an empty payload signifying so in the msg // when we are done backfilling send an empty payload signifying so in the msg
select { select {
case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Msg: "BACKFILL COMPLETE"}: case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}:
log.Debugf("sending backfill completion notice to subscription %s", id) log.Debugf("sending backfill completion notice to subscription %s", id)
default: default:
log.Infof("unable to send backfill completion notice to subscription %s", id) log.Infof("unable to send backfill completion notice to subscription %s", id)

View File

@ -20,7 +20,9 @@ package shared
type RawChainData interface{} type RawChainData interface{}
// The concrete type underneath StreamedIPLDs should not be a pointer // The concrete type underneath StreamedIPLDs should not be a pointer
type StreamedIPLDs interface{} type StreamedIPLDs interface {
Height() int64
}
type CIDsForIndexing interface{} type CIDsForIndexing interface{}
@ -29,7 +31,9 @@ type CIDsForFetching interface{}
type FetchedIPLDs interface{} type FetchedIPLDs interface{}
// The concrete type underneath StreamedIPLDs should not be a pointer // The concrete type underneath StreamedIPLDs should not be a pointer
type ServerResponse interface{} type ServerResponse interface {
Height() int64
}
type Gap struct { type Gap struct {
Start uint64 Start uint64

View File

@ -17,10 +17,19 @@
package super_node package super_node
import ( import (
"errors"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
type Flag int32
const (
EmptyFlag Flag = iota
BackFillCompleteFlag
)
// Subscription holds the information for an individual client subscription to the super node // Subscription holds the information for an individual client subscription to the super node
type Subscription struct { type Subscription struct {
ID rpc.ID ID rpc.ID
@ -33,5 +42,19 @@ type Subscription struct {
type SubscriptionPayload struct { type SubscriptionPayload struct {
Data shared.ServerResponse `json:"data"` // e.g. for Ethereum eth.StreamPayload Data shared.ServerResponse `json:"data"` // e.g. for Ethereum eth.StreamPayload
Err string `json:"err"` // field for error Err string `json:"err"` // field for error
Msg string `json:"msg"` // field for message Flag Flag `json:"flag"` // field for message
}
func (sp SubscriptionPayload) Error() error {
if sp.Err == "" {
return nil
}
return errors.New(sp.Err)
}
func (sp SubscriptionPayload) BackFillComplete() bool {
if sp.Flag == BackFillCompleteFlag {
return true
}
return false
} }