diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 6d4ca1c12..a96aa60ff 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -336,6 +336,8 @@ func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...Node MinerNode: minerNode, RemoteListener: rl, options: options, + + Stop: func(ctx context.Context) error { return nil }, } n.inactive.workers = append(n.inactive.workers, worker) diff --git a/itests/kit/rpc.go b/itests/kit/rpc.go index 5d40ac3e9..6b63eb1eb 100644 --- a/itests/kit/rpc.go +++ b/itests/kit/rpc.go @@ -96,6 +96,12 @@ func workerRpc(t *testing.T, m *TestWorker) *TestWorker { require.NoError(t, err) t.Cleanup(stop) + m.Stop = func(ctx context.Context) error { + srv.Close() + srv.CloseClientConnections() + return nil + } + m.ListenAddr, m.Worker = maddr, cl return m } diff --git a/itests/worker_test.go b/itests/worker_test.go index b002660f1..4e845afe8 100644 --- a/itests/worker_test.go +++ b/itests/worker_test.go @@ -366,7 +366,7 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) { sectors := 2 * 48 * 2 - client, miner, _, ens := kit.EnsembleWorker(t, + client, miner, _, _ := kit.EnsembleWorker(t, kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines kit.LatestActorsAt(-1), kit.ThroughRPC(), @@ -378,17 +378,8 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) { di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) require.NoError(t, err) - bm := ens.InterconnectAll().BeginMiningMustPost(2 * time.Millisecond)[0] - di = di.NextNotElapsed() - t.Log("Running one proving period") - waitUntil := di.Open + di.WPoStChallengeWindow*2 - 2 - client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil)) - - t.Log("Waiting for post message") - bm.Stop() - tryDl := func(dl uint64) { p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK) require.NoError(t, err) @@ -398,10 +389,48 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) { tryDl(0) tryDl(40) tryDl(di.Index + 4) +} - lastPending, err := client.MpoolPending(ctx, types.EmptyTSK) +func TestWindowPostWorkerDisconnected(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _ = logging.SetLogLevel("storageminer", "INFO") + + sectors := 2 * 48 * 2 + + _, miner, badWorker, ens := kit.EnsembleWorker(t, + kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines + kit.LatestActorsAt(-1), + kit.ThroughRPC(), + kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt})) + + var goodWorker kit.TestWorker + ens.Worker(miner, &goodWorker, kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}), kit.ThroughRPC()).Start() + + // wait for all workers + require.Eventually(t, func() bool { + w, err := miner.WorkerStats(ctx) + require.NoError(t, err) + return len(w) == 3 // 2 post + 1 miner-builtin + }, 10*time.Second, 100*time.Millisecond) + + tryDl := func(dl uint64) { + p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK) + require.NoError(t, err) + require.Len(t, p, 1) + require.Equal(t, dl, p[0].Deadline) + } + tryDl(0) // this will run on the not-yet-bad badWorker + + err := badWorker.Stop(ctx) require.NoError(t, err) - require.Len(t, lastPending, 0) + + tryDl(10) // will fail on the badWorker, then should retry on the goodWorker + + time.Sleep(15 * time.Second) + + tryDl(40) // after HeartbeatInterval, the badWorker should be marked as disabled } func TestSchedulerRemoveRequest(t *testing.T) { diff --git a/storage/sealer/manager_post.go b/storage/sealer/manager_post.go index 29748c8ed..40552722c 100644 --- a/storage/sealer/manager_post.go +++ b/storage/sealer/manager_post.go @@ -196,7 +196,7 @@ func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, s skipped = append(skipped, sk...) if err != nil { - retErr = multierr.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", partIdx, err)) + retErr = multierr.Append(retErr, xerrors.Errorf("partitionIndex:%d err:%+v", partIdx, err)) } flk.Unlock() } diff --git a/storage/sealer/sched_post.go b/storage/sealer/sched_post.go index 1055227d8..0e0c39768 100644 --- a/storage/sealer/sched_post.go +++ b/storage/sealer/sched_post.go @@ -2,12 +2,15 @@ package sealer import ( "context" + "errors" "math/rand" "sync" "time" + "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" + "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/storage/paths" @@ -102,15 +105,31 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg } }() - selected := candidates[0] - worker := ps.workers[selected.id] + var rpcErrs error - return worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error { - ps.lk.Unlock() - defer ps.lk.Lock() + for i, selected := range candidates { + worker := ps.workers[selected.id] - return work(ctx, worker.workerRpc) - }) + err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error { + ps.lk.Unlock() + defer ps.lk.Lock() + + return work(ctx, worker.workerRpc) + }) + if err == nil { + return nil + } + + // if the error is RPCConnectionError, try another worker, if not, return the error + if !errors.As(err, new(*jsonrpc.RPCConnectionError)) { + return err + } + + log.Warnw("worker RPC connection error, will retry with another candidate if possible", "error", err, "worker", selected.id, "candidate", i, "candidates", len(candidates)) + rpcErrs = multierror.Append(rpcErrs, err) + } + + return xerrors.Errorf("got RPC errors from all workers: %w", rpcErrs) } type candidateWorker struct { @@ -124,6 +143,11 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand for wid, wr := range ps.workers { needRes := wr.Info.Resources.ResourceSpec(spt, ps.postType) + if !wr.Enabled { + log.Debugf("sched: not scheduling on PoSt-worker %s, worker disabled", wid) + continue + } + if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) { continue }