From e170487faf08bbfa863caacc4f22fbae9111105b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 6 Mar 2023 13:56:23 +0100 Subject: [PATCH 1/5] post worker sched: Filter out disabled workers correctly --- storage/sealer/manager_post.go | 2 +- storage/sealer/sched_post.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/storage/sealer/manager_post.go b/storage/sealer/manager_post.go index 29748c8ed..40552722c 100644 --- a/storage/sealer/manager_post.go +++ b/storage/sealer/manager_post.go @@ -196,7 +196,7 @@ func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, s skipped = append(skipped, sk...) if err != nil { - retErr = multierr.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", partIdx, err)) + retErr = multierr.Append(retErr, xerrors.Errorf("partitionIndex:%d err:%+v", partIdx, err)) } flk.Unlock() } diff --git a/storage/sealer/sched_post.go b/storage/sealer/sched_post.go index 1055227d8..0421ee8a1 100644 --- a/storage/sealer/sched_post.go +++ b/storage/sealer/sched_post.go @@ -124,6 +124,11 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand for wid, wr := range ps.workers { needRes := wr.Info.Resources.ResourceSpec(spt, ps.postType) + if !wr.Enabled { + log.Debugf("sched: not scheduling on PoSt-worker %s, worker disabled", wid) + continue + } + if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) { continue } From 4b99472b35b6dad5fbe12c339cafea9d92be1f5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 6 Mar 2023 14:30:22 +0100 Subject: [PATCH 2/5] itests: Test PoSt worker RPC error handling --- itests/kit/ensemble.go | 2 ++ itests/kit/rpc.go | 6 +++++ itests/worker_test.go | 54 ++++++++++++++++++++++++++++++++---------- 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 6d4ca1c12..a96aa60ff 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -336,6 +336,8 @@ func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...Node MinerNode: minerNode, RemoteListener: rl, options: options, + + Stop: func(ctx context.Context) error { return nil }, } n.inactive.workers = append(n.inactive.workers, worker) diff --git a/itests/kit/rpc.go b/itests/kit/rpc.go index 5d40ac3e9..6b63eb1eb 100644 --- a/itests/kit/rpc.go +++ b/itests/kit/rpc.go @@ -96,6 +96,12 @@ func workerRpc(t *testing.T, m *TestWorker) *TestWorker { require.NoError(t, err) t.Cleanup(stop) + m.Stop = func(ctx context.Context) error { + srv.Close() + srv.CloseClientConnections() + return nil + } + m.ListenAddr, m.Worker = maddr, cl return m } diff --git a/itests/worker_test.go b/itests/worker_test.go index b002660f1..e6c740dbe 100644 --- a/itests/worker_test.go +++ b/itests/worker_test.go @@ -366,7 +366,7 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) { sectors := 2 * 48 * 2 - client, miner, _, ens := kit.EnsembleWorker(t, + client, miner, _, _ := kit.EnsembleWorker(t, kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines kit.LatestActorsAt(-1), kit.ThroughRPC(), @@ -378,17 +378,8 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) { di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) require.NoError(t, err) - bm := ens.InterconnectAll().BeginMiningMustPost(2 * time.Millisecond)[0] - di = di.NextNotElapsed() - t.Log("Running one proving period") - waitUntil := di.Open + di.WPoStChallengeWindow*2 - 2 - client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil)) - - t.Log("Waiting for post message") - bm.Stop() - tryDl := func(dl uint64) { p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK) require.NoError(t, err) @@ -398,10 +389,49 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) { tryDl(0) tryDl(40) tryDl(di.Index + 4) +} - lastPending, err := client.MpoolPending(ctx, types.EmptyTSK) +func TestWindowPostWorkerDisconnected(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _ = logging.SetLogLevel("storageminer", "INFO") + + sectors := 2 * 48 * 2 + + client, miner, badWorker, ens := kit.EnsembleWorker(t, + kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines + kit.LatestActorsAt(-1), + kit.ThroughRPC(), + kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt})) + + var goodWorker kit.TestWorker + ens.Worker(miner, &goodWorker, kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}), kit.ThroughRPC()).Start() + + maddr, err := miner.ActorAddress(ctx) require.NoError(t, err) - require.Len(t, lastPending, 0) + + di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + + di = di.NextNotElapsed() + + tryDl := func(dl uint64) { + p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK) + require.NoError(t, err) + require.Len(t, p, 1) + require.Equal(t, dl, p[0].Deadline) + } + tryDl(0) // this will run on the not-yet-bad badWorker + + err = badWorker.Stop(ctx) + require.NoError(t, err) + + tryDl(10) // will fail on the badWorker, then should retry on the goodWorker + + time.Sleep(15 * time.Second) + + tryDl(40) // after HeartbeatInterval, the badWorker should be marked as disabled } func TestSchedulerRemoveRequest(t *testing.T) { From 9a295e58b0e3c45b91b8d13501e53bbb96073252 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 6 Mar 2023 14:30:58 +0100 Subject: [PATCH 3/5] post worker sched: Retry on alternative worker on RPC errors --- storage/sealer/sched_post.go | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/storage/sealer/sched_post.go b/storage/sealer/sched_post.go index 0421ee8a1..aa96f8eb3 100644 --- a/storage/sealer/sched_post.go +++ b/storage/sealer/sched_post.go @@ -2,6 +2,9 @@ package sealer import ( "context" + "errors" + "github.com/filecoin-project/go-jsonrpc" + "github.com/hashicorp/go-multierror" "math/rand" "sync" "time" @@ -102,15 +105,31 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg } }() - selected := candidates[0] - worker := ps.workers[selected.id] + var rpcErrs error - return worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error { - ps.lk.Unlock() - defer ps.lk.Lock() + for i, selected := range candidates { + worker := ps.workers[selected.id] - return work(ctx, worker.workerRpc) - }) + err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error { + ps.lk.Unlock() + defer ps.lk.Lock() + + return work(ctx, worker.workerRpc) + }) + if err == nil { + return nil + } + + // if the error is RPCConnectionError, try another worker, if not, return the error + if !errors.As(err, new(*jsonrpc.RPCConnectionError)) { + return err + } + + log.Warnw("worker RPC connection error, will retry with another candidate if possible", "error", err, "worker", selected.id, "candidate", i, "candidates", len(candidates)) + rpcErrs = multierror.Append(rpcErrs, err) + } + + return xerrors.Errorf("got RPC errors from all workers: %w", rpcErrs) } type candidateWorker struct { From b0ebdb6882a41503c363fc8e843469c04279c737 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 6 Mar 2023 14:46:26 +0100 Subject: [PATCH 4/5] make gen --- itests/worker_test.go | 12 ++---------- storage/sealer/sched_post.go | 4 ++-- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/itests/worker_test.go b/itests/worker_test.go index e6c740dbe..42c7acdb8 100644 --- a/itests/worker_test.go +++ b/itests/worker_test.go @@ -399,7 +399,7 @@ func TestWindowPostWorkerDisconnected(t *testing.T) { sectors := 2 * 48 * 2 - client, miner, badWorker, ens := kit.EnsembleWorker(t, + _, miner, badWorker, ens := kit.EnsembleWorker(t, kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines kit.LatestActorsAt(-1), kit.ThroughRPC(), @@ -408,14 +408,6 @@ func TestWindowPostWorkerDisconnected(t *testing.T) { var goodWorker kit.TestWorker ens.Worker(miner, &goodWorker, kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}), kit.ThroughRPC()).Start() - maddr, err := miner.ActorAddress(ctx) - require.NoError(t, err) - - di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) - require.NoError(t, err) - - di = di.NextNotElapsed() - tryDl := func(dl uint64) { p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK) require.NoError(t, err) @@ -424,7 +416,7 @@ func TestWindowPostWorkerDisconnected(t *testing.T) { } tryDl(0) // this will run on the not-yet-bad badWorker - err = badWorker.Stop(ctx) + err := badWorker.Stop(ctx) require.NoError(t, err) tryDl(10) // will fail on the badWorker, then should retry on the goodWorker diff --git a/storage/sealer/sched_post.go b/storage/sealer/sched_post.go index aa96f8eb3..0e0c39768 100644 --- a/storage/sealer/sched_post.go +++ b/storage/sealer/sched_post.go @@ -3,14 +3,14 @@ package sealer import ( "context" "errors" - "github.com/filecoin-project/go-jsonrpc" - "github.com/hashicorp/go-multierror" "math/rand" "sync" "time" + "github.com/hashicorp/go-multierror" "golang.org/x/xerrors" + "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/storage/paths" From 9c2f8ee99563ddfb2f15ff7f6df8ff33b4db8ecc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 6 Mar 2023 15:11:21 +0100 Subject: [PATCH 5/5] itests: Wait for both workers in TestWindowPostWorkerDisconnected --- itests/worker_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/itests/worker_test.go b/itests/worker_test.go index 42c7acdb8..4e845afe8 100644 --- a/itests/worker_test.go +++ b/itests/worker_test.go @@ -408,6 +408,13 @@ func TestWindowPostWorkerDisconnected(t *testing.T) { var goodWorker kit.TestWorker ens.Worker(miner, &goodWorker, kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}), kit.ThroughRPC()).Start() + // wait for all workers + require.Eventually(t, func() bool { + w, err := miner.WorkerStats(ctx) + require.NoError(t, err) + return len(w) == 3 // 2 post + 1 miner-builtin + }, 10*time.Second, 100*time.Millisecond) + tryDl := func(dl uint64) { p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK) require.NoError(t, err)