Merge pull request #3254 from filecoin-project/fix/chainwatch/independent-processor-subtasks

fix(chainwatch): Make processor failures independent of each other
This commit is contained in:
Łukasz Magiera 2020-08-24 22:45:02 +02:00 committed by GitHub
commit ecc3ff4a5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 31 deletions

View File

@ -8,7 +8,6 @@ import (
"sync" "sync"
"time" "time"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -144,60 +143,69 @@ func (p *Processor) Start(ctx context.Context) {
"AccountChanges", len(actorChanges[builtin.AccountActorCodeID]), "AccountChanges", len(actorChanges[builtin.AccountActorCodeID]),
"nullRounds", len(nullRounds)) "nullRounds", len(nullRounds))
grp, ctx := errgroup.WithContext(ctx) grp := sync.WaitGroup{}
grp.Go(func() error { grp.Add(1)
go func() {
defer grp.Done()
if err := p.HandleMarketChanges(ctx, actorChanges[builtin.StorageMarketActorCodeID]); err != nil { if err := p.HandleMarketChanges(ctx, actorChanges[builtin.StorageMarketActorCodeID]); err != nil {
return xerrors.Errorf("Failed to handle market changes: %w", err) log.Errorf("Failed to handle market changes: %w", err)
return
} }
log.Info("Processed Market Changes") log.Info("Processed Market Changes")
return nil }()
})
grp.Go(func() error { grp.Add(1)
go func() {
defer grp.Done()
if err := p.HandleMinerChanges(ctx, actorChanges[builtin.StorageMinerActorCodeID]); err != nil { if err := p.HandleMinerChanges(ctx, actorChanges[builtin.StorageMinerActorCodeID]); err != nil {
return xerrors.Errorf("Failed to handle miner changes: %w", err) log.Errorf("Failed to handle miner changes: %w", err)
return
} }
log.Info("Processed Miner Changes") log.Info("Processed Miner Changes")
return nil }()
})
grp.Go(func() error { grp.Add(1)
go func() {
defer grp.Done()
if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID], nullRounds); err != nil { if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID], nullRounds); err != nil {
return xerrors.Errorf("Failed to handle reward changes: %w", err) log.Errorf("Failed to handle reward changes: %w", err)
return
} }
log.Info("Processed Reward Changes") log.Info("Processed Reward Changes")
return nil }()
})
grp.Go(func() error { grp.Add(1)
go func() {
defer grp.Done()
if err := p.HandlePowerChanges(ctx, actorChanges[builtin.StoragePowerActorCodeID]); err != nil { if err := p.HandlePowerChanges(ctx, actorChanges[builtin.StoragePowerActorCodeID]); err != nil {
return xerrors.Errorf("Failed to handle power actor changes: %w", err) log.Errorf("Failed to handle power actor changes: %w", err)
return
} }
log.Info("Processes Power Changes") log.Info("Processes Power Changes")
return nil }()
})
grp.Go(func() error { grp.Add(1)
go func() {
defer grp.Done()
if err := p.HandleMessageChanges(ctx, toProcess); err != nil { if err := p.HandleMessageChanges(ctx, toProcess); err != nil {
return xerrors.Errorf("Failed to handle message changes: %w", err) log.Errorf("Failed to handle message changes: %w", err)
return
} }
log.Info("Processed Message Changes") log.Info("Processed Message Changes")
return nil }()
})
grp.Go(func() error { grp.Add(1)
go func() {
defer grp.Done()
if err := p.HandleCommonActorsChanges(ctx, actorChanges); err != nil { if err := p.HandleCommonActorsChanges(ctx, actorChanges); err != nil {
return xerrors.Errorf("Failed to handle common actor changes: %w", err) log.Errorf("Failed to handle common actor changes: %w", err)
return
} }
log.Info("Processed CommonActor Changes") log.Info("Processed CommonActor Changes")
return nil }()
})
if err := grp.Wait(); err != nil { grp.Wait()
log.Errorw("Failed to handle actor changes...retrying", "error", err)
continue
}
if err := p.markBlocksProcessed(ctx, toProcess); err != nil { if err := p.markBlocksProcessed(ctx, toProcess); err != nil {
log.Fatalw("Failed to mark blocks as processed", "error", err) log.Fatalw("Failed to mark blocks as processed", "error", err)

View File

@ -24,7 +24,7 @@ var runCmd = &cli.Command{
Flags: []cli.Flag{ Flags: []cli.Flag{
&cli.IntFlag{ &cli.IntFlag{
Name: "max-batch", Name: "max-batch",
Value: 1000, Value: 50,
}, },
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {