diff --git a/cmd/lotus-chainwatch/processor/processor.go b/cmd/lotus-chainwatch/processor/processor.go index 2f70f1cb3..f231099b6 100644 --- a/cmd/lotus-chainwatch/processor/processor.go +++ b/cmd/lotus-chainwatch/processor/processor.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -144,60 +143,74 @@ func (p *Processor) Start(ctx context.Context) { "AccountChanges", len(actorChanges[builtin.AccountActorCodeID]), "nullRounds", len(nullRounds)) - grp, ctx := errgroup.WithContext(ctx) + grp := sync.WaitGroup{} - grp.Go(func() error { + grp.Add(1) + go func() error { + defer grp.Done() if err := p.HandleMarketChanges(ctx, actorChanges[builtin.StorageMarketActorCodeID]); err != nil { - return xerrors.Errorf("Failed to handle market changes: %w", err) + log.Errorf("Failed to handle market changes: %w", err) + return nil } log.Info("Processed Market Changes") return nil - }) + }() - grp.Go(func() error { + grp.Add(1) + go func() error { + defer grp.Done() if err := p.HandleMinerChanges(ctx, actorChanges[builtin.StorageMinerActorCodeID]); err != nil { - return xerrors.Errorf("Failed to handle miner changes: %w", err) + log.Errorf("Failed to handle miner changes: %w", err) + return nil } log.Info("Processed Miner Changes") return nil - }) + }() - grp.Go(func() error { + grp.Add(1) + go func() error { + defer grp.Done() if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID], nullRounds); err != nil { - return xerrors.Errorf("Failed to handle reward changes: %w", err) + log.Errorf("Failed to handle reward changes: %w", err) + return nil } log.Info("Processed Reward Changes") return nil - }) + }() - grp.Go(func() error { + grp.Add(1) + go func() error { + defer grp.Done() if err := p.HandlePowerChanges(ctx, actorChanges[builtin.StoragePowerActorCodeID]); err != nil { return xerrors.Errorf("Failed to handle power actor changes: %w", err) } log.Info("Processes Power Changes") return nil - }) + }() - grp.Go(func() error { + grp.Add(1) + go func() error { + defer grp.Done() if err := p.HandleMessageChanges(ctx, toProcess); err != nil { - return xerrors.Errorf("Failed to handle message changes: %w", err) + log.Errorf("Failed to handle message changes: %w", err) + return nil } log.Info("Processed Message Changes") return nil - }) + }() - grp.Go(func() error { + grp.Add(1) + go func() error { + defer grp.Done() if err := p.HandleCommonActorsChanges(ctx, actorChanges); err != nil { - return xerrors.Errorf("Failed to handle common actor changes: %w", err) + log.Errorf("Failed to handle common actor changes: %w", err) + return nil } log.Info("Processed CommonActor Changes") return nil - }) + }() - if err := grp.Wait(); err != nil { - log.Errorw("Failed to handle actor changes...retrying", "error", err) - continue - } + grp.Wait() if err := p.markBlocksProcessed(ctx, toProcess); err != nil { log.Fatalw("Failed to mark blocks as processed", "error", err)