improved logging in super node service

This commit is contained in:
Ian Norden 2020-03-08 11:43:29 -05:00
parent 9d2a30596e
commit aad318c67b
2 changed files with 14 additions and 14 deletions

View File

@ -119,7 +119,7 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) {
log.Infof("searching for gaps in the %s super node database", bfs.chain.String()) log.Infof("searching for gaps in the %s super node database", bfs.chain.String())
startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber() startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber()
if err != nil { if err != nil {
log.Error(err) log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.chain.String(), err)
continue continue
} }
if startingBlock != 0 { if startingBlock != 0 {
@ -128,7 +128,7 @@ func (bfs *BackFillService) FillGapsInSuperNode(wg *sync.WaitGroup) {
} }
gaps, err := bfs.Retriever.RetrieveGapsInData() gaps, err := bfs.Retriever.RetrieveGapsInData()
if err != nil { if err != nil {
log.Error(err) log.Error("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.chain.String(), err)
continue continue
} }
for _, gap := range gaps { for _, gap := range gaps {
@ -153,7 +153,7 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) error {
for { for {
select { select {
case err := <-errChan: case err := <-errChan:
log.Error(err) log.Errorf("super node db backfill error for chain %s: %v", bfs.chain.String(), err)
case <-done: case <-done:
log.Infof("finished filling in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock) log.Infof("finished filling in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock)
return nil return nil
@ -165,7 +165,7 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) error {
// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently // It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently
func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan chan error, done chan bool) error { func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan chan error, done chan bool) error {
if endingBlock < startingBlock { if endingBlock < startingBlock {
return fmt.Errorf("%s backfill: ending block number needs to be greater than starting block number", bfs.chain.String()) return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.chain.String())
} }
// //
// break the range up into bins of smaller ranges // break the range up into bins of smaller ranges

View File

@ -210,7 +210,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<-
case payload := <-sap.PayloadChan: case payload := <-sap.PayloadChan:
ipldPayload, err := sap.Converter.Convert(payload) ipldPayload, err := sap.Converter.Convert(payload)
if err != nil { if err != nil {
log.Error(err) log.Errorf("super node conversion error for chain %s: %v", sap.chain.String(), err)
continue continue
} }
// If we have a ScreenAndServe process running, forward the iplds to it // If we have a ScreenAndServe process running, forward the iplds to it
@ -221,7 +221,7 @@ func (sap *Service) ProcessData(wg *sync.WaitGroup, screenAndServePayload chan<-
// Forward the payload to the publishAndIndex workers // Forward the payload to the publishAndIndex workers
publishAndIndexPayload <- ipldPayload publishAndIndexPayload <- ipldPayload
case err := <-sub.Err(): case err := <-sub.Err():
log.Error(err) log.Errorf("super node subscription error for chain %s: %v", sap.chain.String(), err)
case <-sap.QuitChan: case <-sap.QuitChan:
log.Infof("quiting %s SyncAndPublish process", sap.chain.String()) log.Infof("quiting %s SyncAndPublish process", sap.chain.String())
wg.Done() wg.Done()
@ -242,11 +242,11 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared
case payload := <-publishAndIndexPayload: case payload := <-publishAndIndexPayload:
cidPayload, err := sap.Publisher.Publish(payload) cidPayload, err := sap.Publisher.Publish(payload)
if err != nil { if err != nil {
log.Errorf("worker %d error: %v", id, err) log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err)
continue continue
} }
if err := sap.Indexer.Index(cidPayload); err != nil { if err := sap.Indexer.Index(cidPayload); err != nil {
log.Errorf("worker %d error: %v", id, err) log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err)
} }
} }
} }
@ -283,7 +283,7 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) {
// Retrieve the subscription parameters for this subscription type // Retrieve the subscription parameters for this subscription type
subConfig, ok := sap.SubscriptionTypes[ty] subConfig, ok := sap.SubscriptionTypes[ty]
if !ok { if !ok {
log.Errorf("%s subscription configuration for subscription type %s not available", sap.chain.String(), ty.Hex()) log.Errorf("super node %s subscription configuration for subscription type %s not available", sap.chain.String(), ty.Hex())
sap.closeType(ty) sap.closeType(ty)
continue continue
} }
@ -295,13 +295,13 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) {
} }
response, err := sap.Filterer.Filter(subConfig, payload) response, err := sap.Filterer.Filter(subConfig, payload)
if err != nil { if err != nil {
log.Error(err) log.Errorf("super node filtering error for chain %s: %v", sap.chain.String(), err)
sap.closeType(ty) sap.closeType(ty)
continue continue
} }
responseRLP, err := rlp.EncodeToBytes(response) responseRLP, err := rlp.EncodeToBytes(response)
if err != nil { if err != nil {
log.Error(err) log.Errorf("super node rlp encoding error for chain %s: %v", sap.chain.String(), err)
continue continue
} }
for id, sub := range subs { for id, sub := range subs {
@ -352,7 +352,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha
// Otherwise we only filter new data as it is streamed in from the state diffing geth node // Otherwise we only filter new data as it is streamed in from the state diffing geth node
if params.HistoricalData() || params.HistoricalDataOnly() { if params.HistoricalData() || params.HistoricalDataOnly() {
if err := sap.sendHistoricalData(subscription, id, params); err != nil { if err := sap.sendHistoricalData(subscription, id, params); err != nil {
sendNonBlockingErr(subscription, err) sendNonBlockingErr(subscription, fmt.Errorf("super node subscriber backfill error for chain %s: %v", sap.chain.String(), err))
sendNonBlockingQuit(subscription) sendNonBlockingQuit(subscription)
return return
} }
@ -386,7 +386,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
for i := startingBlock; i <= endingBlock; i++ { for i := startingBlock; i <= endingBlock; i++ {
cidWrappers, empty, err := sap.Retriever.Retrieve(params, i) cidWrappers, empty, err := sap.Retriever.Retrieve(params, i)
if err != nil { if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("%s CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error())) sendNonBlockingErr(sub, fmt.Errorf("super node %s CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error()))
continue continue
} }
if empty { if empty {
@ -395,7 +395,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
for _, cids := range cidWrappers { for _, cids := range cidWrappers {
response, err := sap.IPLDFetcher.Fetch(cids) response, err := sap.IPLDFetcher.Fetch(cids)
if err != nil { if err != nil {
sendNonBlockingErr(sub, fmt.Errorf("%s IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error())) sendNonBlockingErr(sub, fmt.Errorf("super node %s IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error()))
continue continue
} }
responseRLP, err := rlp.EncodeToBytes(response) responseRLP, err := rlp.EncodeToBytes(response)