fix(chainwatch): Make processor failures independent of each other
This commit is contained in:
parent
4311c96a44
commit
da8dbc8ffe
@ -8,7 +8,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
@ -144,60 +143,74 @@ func (p *Processor) Start(ctx context.Context) {
|
|||||||
"AccountChanges", len(actorChanges[builtin.AccountActorCodeID]),
|
"AccountChanges", len(actorChanges[builtin.AccountActorCodeID]),
|
||||||
"nullRounds", len(nullRounds))
|
"nullRounds", len(nullRounds))
|
||||||
|
|
||||||
grp, ctx := errgroup.WithContext(ctx)
|
grp := sync.WaitGroup{}
|
||||||
|
|
||||||
grp.Go(func() error {
|
grp.Add(1)
|
||||||
|
go func() error {
|
||||||
|
defer grp.Done()
|
||||||
if err := p.HandleMarketChanges(ctx, actorChanges[builtin.StorageMarketActorCodeID]); err != nil {
|
if err := p.HandleMarketChanges(ctx, actorChanges[builtin.StorageMarketActorCodeID]); err != nil {
|
||||||
return xerrors.Errorf("Failed to handle market changes: %w", err)
|
log.Errorf("Failed to handle market changes: %w", err)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
log.Info("Processed Market Changes")
|
log.Info("Processed Market Changes")
|
||||||
return nil
|
return nil
|
||||||
})
|
}()
|
||||||
|
|
||||||
grp.Go(func() error {
|
grp.Add(1)
|
||||||
|
go func() error {
|
||||||
|
defer grp.Done()
|
||||||
if err := p.HandleMinerChanges(ctx, actorChanges[builtin.StorageMinerActorCodeID]); err != nil {
|
if err := p.HandleMinerChanges(ctx, actorChanges[builtin.StorageMinerActorCodeID]); err != nil {
|
||||||
return xerrors.Errorf("Failed to handle miner changes: %w", err)
|
log.Errorf("Failed to handle miner changes: %w", err)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
log.Info("Processed Miner Changes")
|
log.Info("Processed Miner Changes")
|
||||||
return nil
|
return nil
|
||||||
})
|
}()
|
||||||
|
|
||||||
grp.Go(func() error {
|
grp.Add(1)
|
||||||
|
go func() error {
|
||||||
|
defer grp.Done()
|
||||||
if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID], nullRounds); err != nil {
|
if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID], nullRounds); err != nil {
|
||||||
return xerrors.Errorf("Failed to handle reward changes: %w", err)
|
log.Errorf("Failed to handle reward changes: %w", err)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
log.Info("Processed Reward Changes")
|
log.Info("Processed Reward Changes")
|
||||||
return nil
|
return nil
|
||||||
})
|
}()
|
||||||
|
|
||||||
grp.Go(func() error {
|
grp.Add(1)
|
||||||
|
go func() error {
|
||||||
|
defer grp.Done()
|
||||||
if err := p.HandlePowerChanges(ctx, actorChanges[builtin.StoragePowerActorCodeID]); err != nil {
|
if err := p.HandlePowerChanges(ctx, actorChanges[builtin.StoragePowerActorCodeID]); err != nil {
|
||||||
return xerrors.Errorf("Failed to handle power actor changes: %w", err)
|
return xerrors.Errorf("Failed to handle power actor changes: %w", err)
|
||||||
}
|
}
|
||||||
log.Info("Processes Power Changes")
|
log.Info("Processes Power Changes")
|
||||||
return nil
|
return nil
|
||||||
})
|
}()
|
||||||
|
|
||||||
grp.Go(func() error {
|
grp.Add(1)
|
||||||
|
go func() error {
|
||||||
|
defer grp.Done()
|
||||||
if err := p.HandleMessageChanges(ctx, toProcess); err != nil {
|
if err := p.HandleMessageChanges(ctx, toProcess); err != nil {
|
||||||
return xerrors.Errorf("Failed to handle message changes: %w", err)
|
log.Errorf("Failed to handle message changes: %w", err)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
log.Info("Processed Message Changes")
|
log.Info("Processed Message Changes")
|
||||||
return nil
|
return nil
|
||||||
})
|
}()
|
||||||
|
|
||||||
grp.Go(func() error {
|
grp.Add(1)
|
||||||
|
go func() error {
|
||||||
|
defer grp.Done()
|
||||||
if err := p.HandleCommonActorsChanges(ctx, actorChanges); err != nil {
|
if err := p.HandleCommonActorsChanges(ctx, actorChanges); err != nil {
|
||||||
return xerrors.Errorf("Failed to handle common actor changes: %w", err)
|
log.Errorf("Failed to handle common actor changes: %w", err)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
log.Info("Processed CommonActor Changes")
|
log.Info("Processed CommonActor Changes")
|
||||||
return nil
|
return nil
|
||||||
})
|
}()
|
||||||
|
|
||||||
if err := grp.Wait(); err != nil {
|
grp.Wait()
|
||||||
log.Errorw("Failed to handle actor changes...retrying", "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := p.markBlocksProcessed(ctx, toProcess); err != nil {
|
if err := p.markBlocksProcessed(ctx, toProcess); err != nil {
|
||||||
log.Fatalw("Failed to mark blocks as processed", "error", err)
|
log.Fatalw("Failed to mark blocks as processed", "error", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user