diff --git a/cmd/superNode.go b/cmd/superNode.go index 0ba43996..412291db 100644 --- a/cmd/superNode.go +++ b/cmd/superNode.go @@ -71,13 +71,13 @@ func superNode() { var forwardPayloadChan chan shared.StreamedIPLDs if superNodeConfig.Serve { forwardPayloadChan = make(chan shared.StreamedIPLDs, super_node.PayloadChanBufferSize) - superNode.ScreenAndServe(wg, forwardPayloadChan) + superNode.FilterAndServe(wg, forwardPayloadChan) if err := startServers(superNode, superNodeConfig); err != nil { logWithCommand.Fatal(err) } } if superNodeConfig.Sync { - if err := superNode.SyncAndPublish(wg, forwardPayloadChan); err != nil { + if err := superNode.ProcessData(wg, forwardPayloadChan); err != nil { logWithCommand.Fatal(err) } } @@ -86,7 +86,7 @@ func superNode() { if err != nil { logWithCommand.Fatal(err) } - backFiller.FillGaps(wg) + backFiller.FillGapsInSuperNode(wg) } } wg.Wait() diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 4116e09b..1ff8dd7d 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -36,7 +36,7 @@ const ( // BackFillInterface for filling in gaps in the super node type BackFillInterface interface { // Method for the super node to periodically check for and fill in gaps in its data using an archival node - FillGaps(wg *sync.WaitGroup) + FillGapsInSuperNode(wg *sync.WaitGroup) } // BackFillService for filling in gaps in the super node @@ -100,9 +100,8 @@ func NewBackFillService(settings *shared.SuperNodeConfig, screenAndServeChan cha }, nil } -// FillGaps periodically checks for and fills in gaps in the super node db -// this requires a core.RpcClient that is pointed at an archival node with the StateDiffAt method exposed -func (bfs *BackFillService) FillGaps(wg *sync.WaitGroup) { +// FillGapsInSuperNode periodically checks for and fills in gaps in the super node db +func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { ticker := time.NewTicker(bfs.GapCheckFrequency) wg.Add(1) diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 833adce8..7facc74c 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -70,7 +70,7 @@ var _ = Describe("BackFiller", func() { QuitChan: quitChan, } wg := &sync.WaitGroup{} - backfiller.FillGaps(wg) + backfiller.FillGapsInSuperNode(wg) time.Sleep(time.Second * 3) quitChan <- true Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) @@ -124,7 +124,7 @@ var _ = Describe("BackFiller", func() { QuitChan: quitChan, } wg := &sync.WaitGroup{} - backfiller.FillGaps(wg) + backfiller.FillGapsInSuperNode(wg) time.Sleep(time.Second * 3) quitChan <- true Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1)) @@ -172,7 +172,7 @@ var _ = Describe("BackFiller", func() { QuitChan: quitChan, } wg := &sync.WaitGroup{} - backfiller.FillGaps(wg) + backfiller.FillGapsInSuperNode(wg) time.Sleep(time.Second * 3) quitChan <- true Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) diff --git a/pkg/super_node/eth/converter.go b/pkg/super_node/eth/converter.go index a6c614c6..3dc22ac2 100644 --- a/pkg/super_node/eth/converter.go +++ b/pkg/super_node/eth/converter.go @@ -19,13 +19,13 @@ package eth import ( "fmt" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" + + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // PayloadConverter satisfies the PayloadConverter interface for ethereum diff --git a/pkg/super_node/helpers.go b/pkg/super_node/helpers.go index 50f80afd..770f0fae 100644 --- a/pkg/super_node/helpers.go +++ b/pkg/super_node/helpers.go @@ -21,7 +21,7 @@ import log "github.com/sirupsen/logrus" func sendNonBlockingErr(sub Subscription, err error) { log.Error(err) select { - case sub.PayloadChan <- SubscriptionPayload{nil, err.Error()}: + case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: err.Error(), Msg: ""}: default: log.Infof("unable to send error to subscription %s", sub.ID) } diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index 943cc275..2ddb7c3e 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -43,15 +43,15 @@ const ( type SuperNode interface { // APIs(), Protocols(), Start() and Stop() node.Service - // Main event loop for syncAndPublish processes - SyncAndPublish(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.StreamedIPLDs) error - // Main event loop for handling client pub-sub - ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs) - // Method to subscribe to receive state diff processing output + // Data processing event loop + ProcessData(wg *sync.WaitGroup, forwardPayloadChan chan<- shared.StreamedIPLDs) error + // Pub-Sub handling event loop + FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs) + // Method to subscribe to the service Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) - // Method to unsubscribe from state diff processing + // Method to unsubscribe from the service Unsubscribe(id rpc.ID) - // Method to access the node info for this service + // Method to access the node info for the service Node() core.Node } @@ -171,10 +171,11 @@ func (sap *Service) APIs() []rpc.API { return append(apis, chainAPI) } -// SyncAndPublish is the backend processing loop which streams data, converts it to iplds, publishes them to ipfs, and indexes their cids -// This continues on no matter if or how many subscribers there are, it then forwards the data to the ScreenAndServe() loop -// which filters and sends relevant data to client subscriptions, if there are any -func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload chan<- shared.StreamedIPLDs) error { +// ProcessData streams incoming raw chain data and converts it for further processing +// It forwards the converted data to the publishAndIndex process(es) it spins up +// If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel +// This continues on no matter if or how many subscribers there are +func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- shared.StreamedIPLDs) error { sub, err := sap.Streamer.Stream(sap.PayloadChan) if err != nil { return err @@ -213,10 +214,12 @@ func (sap *Service) SyncAndPublish(wg *sync.WaitGroup, screenAndServePayload cha } } }() - log.Info("syncAndPublish goroutine successfully spun up") + log.Info("ProcessData goroutine successfully spun up") return nil } +// publishAndIndex is spun up by SyncAndConvert and receives converted chain data from that process +// it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.StreamedIPLDs) { go func() { for { @@ -233,18 +236,20 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared } } }() - log.Info("publishAndIndex goroutine successfully spun up") + log.Debugf("publishAndIndex goroutine successfully spun up") } -// ScreenAndServe is the loop used to screen data streamed from the state diffing eth node -// and send the appropriate portions of it to a requesting client subscription, according to their subscription configuration -func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs) { +// FilterAndServe listens for incoming converter data off the screenAndServePayload from the SyncAndConvert process +// It filters and sends this data to any subscribers to the service +// This process can be stood up alone, without an screenAndServePayload attached to a SyncAndConvert process +// and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only +func (sap *Service) FilterAndServe(wg *sync.WaitGroup, screenAndServePayload <-chan shared.StreamedIPLDs) { wg.Add(1) go func() { for { select { case payload := <-screenAndServePayload: - sap.sendResponse(payload) + sap.filterAndServe(payload) case <-sap.QuitChan: log.Info("quiting ScreenAndServe process") wg.Done() @@ -252,10 +257,12 @@ func (sap *Service) ScreenAndServe(wg *sync.WaitGroup, screenAndServePayload <-c } } }() - log.Info("screenAndServe goroutine successfully spun up") + log.Info("FilterAndServe goroutine successfully spun up") } -func (sap *Service) sendResponse(payload shared.StreamedIPLDs) { +// filterAndServe filters the payload according to each subscription type and sends to the subscriptions +func (sap *Service) filterAndServe(payload shared.StreamedIPLDs) { + log.Debugf("Sending payload to subscriptions") sap.Lock() for ty, subs := range sap.Subscriptions { // Retrieve the subscription parameters for this subscription type @@ -273,8 +280,8 @@ func (sap *Service) sendResponse(payload shared.StreamedIPLDs) { } for id, sub := range subs { select { - case sub.PayloadChan <- SubscriptionPayload{response, ""}: - log.Infof("sending super node payload to subscription %s", id) + case sub.PayloadChan <- SubscriptionPayload{Data: response, Err: "", Msg: ""}: + log.Debugf("sending super node payload to subscription %s", id) default: log.Infof("unable to send payload to subscription %s; channel has no receiver", id) } @@ -283,10 +290,10 @@ func (sap *Service) sendResponse(payload shared.StreamedIPLDs) { sap.Unlock() } -// Subscribe is used by the API to subscribe to the service loop +// Subscribe is used by the API to remotely subscribe to the service loop // The params must be rlp serializable and satisfy the SubscriptionSettings() interface func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) { - log.Info("Subscribing to the super node service") + log.Infof("New subscription %s", id) subscription := Subscription{ ID: id, PayloadChan: sub, @@ -297,7 +304,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha sendNonBlockingQuit(subscription) return } - // Subscription type is defined as the hash of the subscription settings + // Subscription type is defined as the hash of the rlp-serialized subscription settings by, err := rlp.EncodeToBytes(params) if err != nil { sendNonBlockingErr(subscription, err) @@ -308,7 +315,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha // If the subscription requests a backfill, use the Postgres index to lookup and retrieve historical data // Otherwise we only filter new data as it is streamed in from the state diffing geth node if params.HistoricalData() || params.HistoricalDataOnly() { - if err := sap.backFill(subscription, id, params); err != nil { + if err := sap.sendHistoricalData(subscription, id, params); err != nil { sendNonBlockingErr(subscription, err) sendNonBlockingQuit(subscription) return @@ -326,8 +333,9 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha } } -func (sap *Service) backFill(sub Subscription, id rpc.ID, params shared.SubscriptionSettings) error { - log.Debug("sending historical data for subscriber", id) +// sendHistoricalData sends historical data to the requesting subscription +func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params shared.SubscriptionSettings) error { + log.Info("Sending historical data to subscription", id) // Retrieve cached CIDs relevant to this subscriber var endingBlock int64 var startingBlock int64 @@ -347,7 +355,7 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, params shared.Subscrip endingBlock = params.EndingBlock().Int64() } log.Debug("historical data starting block:", params.StartingBlock()) - log.Debug("histocial data ending block:", endingBlock) + log.Debug("historical data ending block:", endingBlock) go func() { for i := startingBlock; i <= endingBlock; i++ { cidWrapper, empty, err := sap.Retriever.Retrieve(params, i) @@ -369,19 +377,26 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, params shared.Subscrip continue } select { - case sub.PayloadChan <- SubscriptionPayload{backFillIplds, ""}: - log.Infof("sending super node historical data payload to subscription %s", id) + case sub.PayloadChan <- SubscriptionPayload{Data: backFillIplds, Err: "", Msg: ""}: + log.Debugf("sending super node historical data payload to subscription %s", id) default: log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id) } } + // when we are done backfilling send an empty payload signifying so in the msg + select { + case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Msg: "BACKFILL COMPLETE"}: + log.Debugf("sending backfill completion notice to subscription %s", id) + default: + log.Infof("unable to send backfill completion notice to subscription %s", id) + } }() return nil } -// Unsubscribe is used to unsubscribe to the StateDiffingService loop +// Unsubscribe is used by the API to remotely unsubscribe to the StateDiffingService loop func (sap *Service) Unsubscribe(id rpc.ID) { - log.Info("Unsubscribing from the super node service") + log.Infof("Unsubscribing %s from the super node service", id) sap.Lock() for ty := range sap.Subscriptions { delete(sap.Subscriptions[ty], id) @@ -395,18 +410,20 @@ func (sap *Service) Unsubscribe(id rpc.ID) { } // Start is used to begin the service +// This is mostly just to satisfy the node.Service interface func (sap *Service) Start(*p2p.Server) error { log.Info("Starting super node service") wg := new(sync.WaitGroup) payloadChan := make(chan shared.StreamedIPLDs, PayloadChanBufferSize) - if err := sap.SyncAndPublish(wg, payloadChan); err != nil { + if err := sap.ProcessData(wg, payloadChan); err != nil { return err } - sap.ScreenAndServe(wg, payloadChan) + sap.FilterAndServe(wg, payloadChan) return nil } // Stop is used to close down the service +// This is mostly just to satisfy the node.Service interface func (sap *Service) Stop() error { log.Info("Stopping super node service") sap.Lock() @@ -424,6 +441,7 @@ func (sap *Service) Node() core.Node { // close is used to close all listening subscriptions // close needs to be called with subscription access locked func (sap *Service) close() { + log.Info("Closing all subscriptions") for subType, subs := range sap.Subscriptions { for _, sub := range subs { sendNonBlockingQuit(sub) @@ -436,6 +454,7 @@ func (sap *Service) close() { // closeType is used to close all subscriptions of given type // closeType needs to be called with subscription access locked func (sap *Service) closeType(subType common.Hash) { + log.Infof("Closing all subscriptions of type %s", subType.String()) subs := sap.Subscriptions[subType] for _, sub := range subs { sendNonBlockingQuit(sub) diff --git a/pkg/super_node/service_test.go b/pkg/super_node/service_test.go index c4568ea6..517a4df0 100644 --- a/pkg/super_node/service_test.go +++ b/pkg/super_node/service_test.go @@ -63,7 +63,7 @@ var _ = Describe("Service", func() { QuitChan: quitChan, WorkerPoolSize: 1, } - err := processor.SyncAndPublish(wg, nil) + err := processor.ProcessData(wg, nil) Expect(err).ToNot(HaveOccurred()) time.Sleep(2 * time.Second) quitChan <- true diff --git a/pkg/super_node/subscription.go b/pkg/super_node/subscription.go index e693eef7..8dabf757 100644 --- a/pkg/super_node/subscription.go +++ b/pkg/super_node/subscription.go @@ -32,5 +32,6 @@ type Subscription struct { // It carries data of a type specific to the chain being supported/queried and an error message type SubscriptionPayload struct { Data shared.ServerResponse `json:"data"` // e.g. for Ethereum eth.StreamPayload - Err string `json:"err"` + Err string `json:"err"` // field for error + Msg string `json:"msg"` // field for message }