worker: Fix challengeThrottle
This commit is contained in:
parent
69e8516d1f
commit
f1ddb55252
4
extern/sector-storage/worker_local.go
vendored
4
extern/sector-storage/worker_local.go
vendored
@ -633,7 +633,7 @@ func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.Registered
|
||||
for i, s := range sectors {
|
||||
if l.challengeThrottle != nil {
|
||||
select {
|
||||
case <-l.challengeThrottle:
|
||||
case l.challengeThrottle <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
return storiface.WindowPoStResult{}, xerrors.Errorf("context error waiting on challengeThrottle %w", err)
|
||||
}
|
||||
@ -643,7 +643,7 @@ func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.Registered
|
||||
defer wg.Done()
|
||||
defer func() {
|
||||
if l.challengeThrottle != nil {
|
||||
l.challengeThrottle <- struct{}{}
|
||||
<-l.challengeThrottle
|
||||
}
|
||||
}()
|
||||
|
||||
|
65
extern/sector-storage/worker_local_test.go
vendored
Normal file
65
extern/sector-storage/worker_local_test.go
vendored
Normal file
@ -0,0 +1,65 @@
|
||||
package sectorstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-statestore"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
)
|
||||
|
||||
type hangStore struct {
|
||||
stores.Store
|
||||
|
||||
challengeReads chan struct{}
|
||||
unhang chan struct{}
|
||||
}
|
||||
|
||||
func (s *hangStore) GenerateSingleVanillaProof(ctx context.Context, minerID abi.ActorID, si storiface.PostSectorChallenge, ppt abi.RegisteredPoStProof) ([]byte, error) {
|
||||
select {
|
||||
case s.challengeReads <- struct{}{}:
|
||||
default:
|
||||
panic("this shouldn't happen")
|
||||
}
|
||||
<-s.unhang
|
||||
<-s.challengeReads
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestWorkerChallengeThrottle(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
hs := &hangStore{
|
||||
challengeReads: make(chan struct{}, 8),
|
||||
unhang: make(chan struct{}),
|
||||
}
|
||||
|
||||
wcfg := WorkerConfig{
|
||||
MaxParallelChallengeReads: 8,
|
||||
}
|
||||
|
||||
lw := NewLocalWorker(wcfg, hs, nil, nil, nil, statestore.New(datastore.NewMapDatastore()))
|
||||
|
||||
var ch []storiface.PostSectorChallenge
|
||||
for i := 0; i < 128; i++ {
|
||||
ch = append(ch, storiface.PostSectorChallenge{
|
||||
SealProof: 0,
|
||||
SectorNumber: abi.SectorNumber(i),
|
||||
})
|
||||
}
|
||||
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
close(hs.unhang)
|
||||
}()
|
||||
|
||||
_, err := lw.GenerateWindowPoSt(ctx, abi.RegisteredPoStProof_StackedDrgWindow32GiBV1, 0, ch, 0, nil)
|
||||
require.NoError(t, err)
|
||||
}
|
Loading…
Reference in New Issue
Block a user