Merge pull request #10394 from filecoin-project/fix/disabled-post-worker-handling

fix: wdpost: disabled post worker handling
This commit is contained in:
Aayush Rajasekaran 2023-03-06 10:22:46 -05:00 committed by GitHub
commit fea7dfd212
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 81 additions and 20 deletions

View File

@ -336,6 +336,8 @@ func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...Node
MinerNode: minerNode, MinerNode: minerNode,
RemoteListener: rl, RemoteListener: rl,
options: options, options: options,
Stop: func(ctx context.Context) error { return nil },
} }
n.inactive.workers = append(n.inactive.workers, worker) n.inactive.workers = append(n.inactive.workers, worker)

View File

@ -96,6 +96,12 @@ func workerRpc(t *testing.T, m *TestWorker) *TestWorker {
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(stop) t.Cleanup(stop)
m.Stop = func(ctx context.Context) error {
srv.Close()
srv.CloseClientConnections()
return nil
}
m.ListenAddr, m.Worker = maddr, cl m.ListenAddr, m.Worker = maddr, cl
return m return m
} }

View File

@ -366,7 +366,7 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
sectors := 2 * 48 * 2 sectors := 2 * 48 * 2
client, miner, _, ens := kit.EnsembleWorker(t, client, miner, _, _ := kit.EnsembleWorker(t,
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1), kit.LatestActorsAt(-1),
kit.ThroughRPC(), kit.ThroughRPC(),
@ -378,17 +378,8 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err) require.NoError(t, err)
bm := ens.InterconnectAll().BeginMiningMustPost(2 * time.Millisecond)[0]
di = di.NextNotElapsed() di = di.NextNotElapsed()
t.Log("Running one proving period")
waitUntil := di.Open + di.WPoStChallengeWindow*2 - 2
client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil))
t.Log("Waiting for post message")
bm.Stop()
tryDl := func(dl uint64) { tryDl := func(dl uint64) {
p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK) p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK)
require.NoError(t, err) require.NoError(t, err)
@ -398,10 +389,48 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
tryDl(0) tryDl(0)
tryDl(40) tryDl(40)
tryDl(di.Index + 4) tryDl(di.Index + 4)
}
lastPending, err := client.MpoolPending(ctx, types.EmptyTSK) func TestWindowPostWorkerDisconnected(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_ = logging.SetLogLevel("storageminer", "INFO")
sectors := 2 * 48 * 2
_, miner, badWorker, ens := kit.EnsembleWorker(t,
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1),
kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}))
var goodWorker kit.TestWorker
ens.Worker(miner, &goodWorker, kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}), kit.ThroughRPC()).Start()
// wait for all workers
require.Eventually(t, func() bool {
w, err := miner.WorkerStats(ctx)
require.NoError(t, err)
return len(w) == 3 // 2 post + 1 miner-builtin
}, 10*time.Second, 100*time.Millisecond)
tryDl := func(dl uint64) {
p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK)
require.NoError(t, err)
require.Len(t, p, 1)
require.Equal(t, dl, p[0].Deadline)
}
tryDl(0) // this will run on the not-yet-bad badWorker
err := badWorker.Stop(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, lastPending, 0)
tryDl(10) // will fail on the badWorker, then should retry on the goodWorker
time.Sleep(15 * time.Second)
tryDl(40) // after HeartbeatInterval, the badWorker should be marked as disabled
} }
func TestSchedulerRemoveRequest(t *testing.T) { func TestSchedulerRemoveRequest(t *testing.T) {

View File

@ -196,7 +196,7 @@ func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, s
skipped = append(skipped, sk...) skipped = append(skipped, sk...)
if err != nil { if err != nil {
retErr = multierr.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", partIdx, err)) retErr = multierr.Append(retErr, xerrors.Errorf("partitionIndex:%d err:%+v", partIdx, err))
} }
flk.Unlock() flk.Unlock()
} }

View File

@ -2,12 +2,15 @@ package sealer
import ( import (
"context" "context"
"errors"
"math/rand" "math/rand"
"sync" "sync"
"time" "time"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/paths"
@ -102,15 +105,31 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
} }
}() }()
selected := candidates[0] var rpcErrs error
worker := ps.workers[selected.id]
return worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error { for i, selected := range candidates {
ps.lk.Unlock() worker := ps.workers[selected.id]
defer ps.lk.Lock()
return work(ctx, worker.workerRpc) err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
}) ps.lk.Unlock()
defer ps.lk.Lock()
return work(ctx, worker.workerRpc)
})
if err == nil {
return nil
}
// if the error is RPCConnectionError, try another worker, if not, return the error
if !errors.As(err, new(*jsonrpc.RPCConnectionError)) {
return err
}
log.Warnw("worker RPC connection error, will retry with another candidate if possible", "error", err, "worker", selected.id, "candidate", i, "candidates", len(candidates))
rpcErrs = multierror.Append(rpcErrs, err)
}
return xerrors.Errorf("got RPC errors from all workers: %w", rpcErrs)
} }
type candidateWorker struct { type candidateWorker struct {
@ -124,6 +143,11 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand
for wid, wr := range ps.workers { for wid, wr := range ps.workers {
needRes := wr.Info.Resources.ResourceSpec(spt, ps.postType) needRes := wr.Info.Resources.ResourceSpec(spt, ps.postType)
if !wr.Enabled {
log.Debugf("sched: not scheduling on PoSt-worker %s, worker disabled", wid)
continue
}
if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) { if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) {
continue continue
} }