workers: PoSt challenge throttle/timeout config

This commit is contained in:
Łukasz Magiera 2022-03-24 19:42:44 -04:00
parent 13701c7ce2
commit 0b3144f566
2 changed files with 60 additions and 13 deletions

View File

@ -192,6 +192,16 @@ var runCmd = &cli.Command{
Usage: "maximum fetch operations to run in parallel",
Value: 5,
},
&cli.IntFlag{
Name: "post-parallel-reads",
Usage: "maximum number of parallel challenge reads (0 = no limit)",
Value: 0,
},
&cli.DurationFlag{
Name: "post-read-timeout",
Usage: "time limit for reading PoSt challenges (0 = no limit)",
Value: 0,
},
&cli.StringFlag{
Name: "timeout",
Usage: "used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function",
@ -449,8 +459,10 @@ var runCmd = &cli.Command{
workerApi := &sealworker.Worker{
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
TaskTypes: taskTypes,
NoSwap: cctx.Bool("no-swap"),
TaskTypes: taskTypes,
NoSwap: cctx.Bool("no-swap"),
MaxParallelChallengeReads: cctx.Int("post-parallel-reads"),
ChallengeReadTimeout: cctx.Duration("post-read-timeout"),
}, remote, localStore, nodeApi, nodeApi, wsts),
LocalStore: localStore,
Storage: lr,

View File

@ -39,6 +39,9 @@ type WorkerConfig struct {
// worker regardless of its currently available resources. Used in testing
// with the local worker.
IgnoreResourceFiltering bool
MaxParallelChallengeReads int // 0 = no limit
ChallengeReadTimeout time.Duration // 0 = no timeout
}
// used do provide custom proofs impl (mostly used in testing)
@ -62,6 +65,9 @@ type LocalWorker struct {
running sync.WaitGroup
taskLk sync.Mutex
challengeThrottle chan struct{}
challengeReadTimeout time.Duration
session uuid.UUID
testDisable int64
closing chan struct{}
@ -82,13 +88,18 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc,
ct: &workerCallTracker{
st: cst,
},
acceptTasks: acceptTasks,
executor: executor,
noSwap: wcfg.NoSwap,
envLookup: envLookup,
ignoreResources: wcfg.IgnoreResourceFiltering,
session: uuid.New(),
closing: make(chan struct{}),
acceptTasks: acceptTasks,
executor: executor,
noSwap: wcfg.NoSwap,
envLookup: envLookup,
ignoreResources: wcfg.IgnoreResourceFiltering,
challengeReadTimeout: wcfg.ChallengeReadTimeout,
session: uuid.New(),
closing: make(chan struct{}),
}
if wcfg.MaxParallelChallengeReads > 0 {
w.challengeThrottle = make(chan struct{}, wcfg.MaxParallelChallengeReads)
}
if w.executor == nil {
@ -566,7 +577,9 @@ func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.Registere
return nil, err
}
// todo throttle config
// don't throttle winningPoSt
// * Always want it done asap
// * It's usually just one sector
var wg sync.WaitGroup
wg.Add(len(sectors))
@ -577,7 +590,12 @@ func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.Registere
go func(i int, s storiface.PostSectorChallenge) {
defer wg.Done()
// todo context with tighter deadline (+config)
if l.challengeReadTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, l.challengeReadTimeout)
defer cancel()
}
vanilla, err := l.storage.GenerateSingleVanillaProof(ctx, mid, s, ppt)
if err != nil {
rerr = multierror.Append(rerr, xerrors.Errorf("get winning sector:%d,vanila failed: %w", s.SectorNumber, err))
@ -607,17 +625,34 @@ func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.Registered
var slk sync.Mutex
var skipped []abi.SectorID
// todo throttle config
var wg sync.WaitGroup
wg.Add(len(sectors))
vproofs := make([][]byte, len(sectors))
for i, s := range sectors {
if l.challengeThrottle != nil {
select {
case <-l.challengeThrottle:
case <-ctx.Done():
return storiface.WindowPoStResult{}, xerrors.Errorf("context error waiting on challengeThrottle %w", err)
}
}
go func(i int, s storiface.PostSectorChallenge) {
defer wg.Done()
defer func() {
if l.challengeThrottle != nil {
l.challengeThrottle <- struct{}{}
}
}()
if l.challengeReadTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, l.challengeReadTimeout)
defer cancel()
}
// todo context with tighter deadline (+config)
vanilla, err := l.storage.GenerateSingleVanillaProof(ctx, mid, s, ppt)
slk.Lock()
defer slk.Unlock()