respond to caller. Add itest

This commit is contained in:
LexLuthr 2022-08-03 19:37:02 +05:30
parent cdc08e566f
commit fad79f3218
2 changed files with 77 additions and 4 deletions

View File

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/uuid"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -14,6 +15,7 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/itests/kit"
@ -21,6 +23,7 @@ import (
"github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/wdpost" "github.com/filecoin-project/lotus/storage/wdpost"
@ -401,3 +404,75 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Len(t, lastPending, 0) require.Len(t, lastPending, 0)
} }
func TestSchedulerRemoveRequest(t *testing.T) {
ctx := context.Background()
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
e, err := worker.Enabled(ctx)
require.NoError(t, err)
require.True(t, e)
type SchedDiagRequestInfo struct {
Sector abi.SectorID
TaskType sealtasks.TaskType
Priority int
SchedId uuid.UUID
}
type SchedDiagInfo struct {
Requests []SchedDiagRequestInfo
OpenWindows []string
}
type i struct {
SchedInfo interface{}
ReturnedWork []string
Waiting []string
CallToWork map[string]string
EarlyRet []string
}
go miner.PledgeSectors(ctx, 1, 0, nil)
// Keep checking till sector state is PC2, the request should get stuck as worker cannot process PC2
for {
st, err := miner.SectorsStatus(ctx, 1, false)
require.NoError(t, err)
if st.State == api.SectorState(sealing.PreCommit2) {
break
}
}
// Dump current scheduler info
schedb, err := miner.SealingSchedDiag(ctx, false)
if err != nil {
t.Log("Failed to dump scheduler state before: %w", err)
t.FailNow()
}
var schedidb uuid.UUID
// cast scheduler info and get the request UUID. Call the SealingRemoveRequest()
if schedb.(i).SchedInfo.(SchedDiagInfo).Requests[0].TaskType == "seal/v0/precommit/2" {
schedidb = schedb.(i).SchedInfo.(SchedDiagInfo).Requests[0].SchedId
err = miner.SealingRemoveRequest(ctx, schedidb)
if err != nil {
t.Log("Failed to dump scheduler state before: %w", err)
t.FailNow()
}
}
// Dump the schduler again and compare the UUID if a request is present
// If no request present then pass the test
scheda, err := miner.SealingSchedDiag(ctx, false)
if err != nil {
t.Log("Failed to dump scheduler state before: %w", err)
t.FailNow()
}
if len(scheda.(i).SchedInfo.(SchedDiagInfo).Requests) > 0 && scheda.(i).SchedInfo.(SchedDiagInfo).Requests[0].TaskType == "seal/v0/precommit/2" {
schedida := scheda.(i).SchedInfo.(SchedDiagInfo).Requests[0].SchedId
require.NotEqual(t, schedida, schedidb)
}
}

View File

@ -406,6 +406,7 @@ func (sh *Scheduler) removeRequest(rmrequest *rmRequest) {
if r.SchedId == rmrequest.id { if r.SchedId == rmrequest.id {
queue.Remove(i) queue.Remove(i)
rmrequest.res <- nil rmrequest.res <- nil
go r.respond(xerrors.Errorf("scheduling request removed"))
return return
} }
} }
@ -428,10 +429,7 @@ func (sh *Scheduler) RemoveRequest(ctx context.Context, schedId uuid.UUID) error
select { select {
case resp := <-ret: case resp := <-ret:
if resp != nil { return resp
return resp
}
return nil
case <-sh.closing: case <-sh.closing:
return xerrors.New("closing") return xerrors.New("closing")
case <-ctx.Done(): case <-ctx.Done():