diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 2a857c60..851f601c 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -119,7 +119,7 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { log.Infof("searching for gaps in the %s super node database", bfs.chain.String()) startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber() if err != nil { - log.Error(err) + log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.chain.String(), err) continue } if startingBlock != 0 { @@ -128,7 +128,7 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) { } gaps, err := bfs.Retriever.RetrieveGapsInData() if err != nil { - log.Error(err) + log.Error("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.chain.String(), err) continue } for _, gap := range gaps { @@ -153,7 +153,7 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) error { for { select { case err := <-errChan: - log.Error(err) + log.Errorf("super node db backfill error for chain %s: %v", bfs.chain.String(), err) case <-done: log.Infof("finished filling in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock) return nil @@ -165,7 +165,7 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) error { // It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan chan error, done chan bool) error { if endingBlock < startingBlock { - return fmt.Errorf("%s backfill: ending block number needs to be greater than starting block number", bfs.chain.String()) + return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.chain.String()) } // // break the range up into bins of smaller ranges diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index 9ad7e401..01ebdeea 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -210,7 +210,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- case payload := <-sap.PayloadChan: ipldPayload, err := sap.Converter.Convert(payload) if err != nil { - log.Error(err) + log.Errorf("super node conversion error for chain %s: %v", sap.chain.String(), err) continue } // If we have a ScreenAndServe process running, forward the iplds to it @@ -221,7 +221,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<- // Forward the payload to the publishAndIndex workers publishAndIndexPayload <- ipldPayload case err := <-sub.Err(): - log.Error(err) + log.Errorf("super node subscription error for chain %s: %v", sap.chain.String(), err) case <-sap.QuitChan: log.Infof("quiting %s SyncAndPublish process", sap.chain.String()) wg.Done() @@ -242,11 +242,11 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared case payload := <-publishAndIndexPayload: cidPayload, err := sap.Publisher.Publish(payload) if err != nil { - log.Errorf("worker %d error: %v", id, err) + log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err) continue } if err := sap.Indexer.Index(cidPayload); err != nil { - log.Errorf("worker %d error: %v", id, err) + log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err) } } } @@ -283,7 +283,7 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) { // Retrieve the subscription parameters for this subscription type subConfig, ok := sap.SubscriptionTypes[ty] if !ok { - log.Errorf("%s subscription configuration for subscription type %s not available", sap.chain.String(), ty.Hex()) + log.Errorf("super node %s subscription configuration for subscription type %s not available", sap.chain.String(), ty.Hex()) sap.closeType(ty) continue } @@ -295,13 +295,13 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) { } response, err := sap.Filterer.Filter(subConfig, payload) if err != nil { - log.Error(err) + log.Errorf("super node filtering error for chain %s: %v", sap.chain.String(), err) sap.closeType(ty) continue } responseRLP, err := rlp.EncodeToBytes(response) if err != nil { - log.Error(err) + log.Errorf("super node rlp encoding error for chain %s: %v", sap.chain.String(), err) continue } for id, sub := range subs { @@ -352,7 +352,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha // Otherwise we only filter new data as it is streamed in from the state diffing geth node if params.HistoricalData() || params.HistoricalDataOnly() { if err := sap.sendHistoricalData(subscription, id, params); err != nil { - sendNonBlockingErr(subscription, err) + sendNonBlockingErr(subscription, fmt.Errorf("super node subscriber backfill error for chain %s: %v", sap.chain.String(), err)) sendNonBlockingQuit(subscription) return } @@ -386,7 +386,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share for i := startingBlock; i <= endingBlock; i++ { cidWrappers, empty, err := sap.Retriever.Retrieve(params, i) if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("%s CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error())) + sendNonBlockingErr(sub, fmt.Errorf("super node %s CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error())) continue } if empty { @@ -395,7 +395,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share for _, cids := range cidWrappers { response, err := sap.IPLDFetcher.Fetch(cids) if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("%s IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error())) + sendNonBlockingErr(sub, fmt.Errorf("super node %s IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error())) continue } responseRLP, err := rlp.EncodeToBytes(response)