sector import: RemoteSealingDoneEndpoint

This commit is contained in:
Łukasz Magiera 2022-09-09 14:38:23 +02:00
parent ef834b988c
commit 061a990eb8
11 changed files with 226 additions and 27 deletions

View File

@ -573,12 +573,17 @@ type RemoteSectorMeta struct {
// SEALING SERVICE HOOKS
// URL
// todo better doc
// RemoteCommit1Endpoint is an URL of POST endpoint which lotus will call requesting Commit1 (seal_commit_phase1)
// request body will be json-serialized RemoteCommit1Params struct
RemoteCommit1Endpoint string
// RemoteCommit2Endpoint is an URL of POST endpoint which lotus will call requesting Commit2 (seal_commit_phase2)
// request body will be json-serialized RemoteCommit2Params struct
RemoteCommit2Endpoint string
// todo OnDone / OnStateChange
// RemoteSealingDoneEndpoint is called after the sector exists the sealing pipeline
// request body will be json-serialized RemoteSealingDoneParams struct
RemoteSealingDoneEndpoint string
}
type RemoteCommit1Params struct {
@ -597,3 +602,15 @@ type RemoteCommit2Params struct {
// todo spec better
Commit1Out storiface.Commit1Out
}
type RemoteSealingDoneParams struct {
// Successful is true if the sector has entered state considered as "successfully sealed"
Successful bool
// State is the state the sector has entered
// For example "Proving" / "Removing"
State string
// Optional commit message CID
CommitMessage *cid.Cid
}

Binary file not shown.

View File

@ -3300,7 +3300,8 @@ Inputs:
]
},
"RemoteCommit1Endpoint": "string value",
"RemoteCommit2Endpoint": "string value"
"RemoteCommit2Endpoint": "string value",
"RemoteSealingDoneEndpoint": "string value"
}
]
```

View File

@ -109,7 +109,7 @@ func (tm *TestMiner) WaitSectorsProvingAllowFails(ctx context.Context, toCheck m
st, err := tm.StorageMiner.SectorsStatus(ctx, n, false)
require.NoError(tm.t, err)
states[st.State]++
if st.State == api.SectorState(sealing.Proving) || st.State == api.SectorState(sealing.Available) {
if st.State == api.SectorState(sealing.Proving) || st.State == api.SectorState(sealing.Available) || st.State == api.SectorState(sealing.Removed) {
delete(toCheck, n)
}
if strings.Contains(string(st.State), "Fail") {

View File

@ -5,10 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
spaths "github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/tarutil"
"net/http"
"net/http/httptest"
"os"
@ -24,27 +20,37 @@ import (
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/api"
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
spaths "github.com/filecoin-project/lotus/storage/paths"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/sealer/tarutil"
)
func TestSectorImport(t *testing.T) {
type testCase struct {
c1handler func(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request)
c1handler func(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request)
mutateRemoteMeta func(*api.RemoteSectorMeta)
expectImportErrContains string
expectDoneSuccess bool
expectDoneState string
}
makeTest := func(mut func(*testCase)) *testCase {
tc := &testCase{
c1handler: testRemoteCommit1,
expectDoneSuccess: true,
expectDoneState: "Proving",
}
mut(tc)
return tc
@ -149,9 +155,12 @@ func TestSectorImport(t *testing.T) {
////////
// start http server serving sector data
doneResp := new(*api.RemoteSealingDoneParams)
m := mux.NewRouter()
m.HandleFunc("/sectors/{type}/{id}", testRemoteGetSector(sectorDir)).Methods("GET")
m.HandleFunc("/sectors/{id}/commit1", tc.c1handler(sealer)).Methods("POST")
m.HandleFunc("/sectors/{id}/commit1", tc.c1handler(sealer, miner)).Methods("POST")
m.HandleFunc("/sectors/{id}/sealed", testRemoteDone(doneResp)).Methods("POST")
m.HandleFunc("/commit2", testRemoteCommit2(sealer)).Methods("POST")
srv := httptest.NewServer(m)
@ -159,6 +168,8 @@ func TestSectorImport(t *testing.T) {
sealedURL := fmt.Sprintf("%s/sectors/sealed/s-t0%d-%d", srv.URL, mid, snum)
cacheURL := fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum)
remoteC1URL := fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum)
remoteC2URL := fmt.Sprintf("%s/commit2", srv.URL)
doneURL := fmt.Sprintf("%s/sectors/s-t0%d-%d/sealed", srv.URL, mid, snum)
////////
// import the sector and continue sealing
@ -196,7 +207,9 @@ func TestSectorImport(t *testing.T) {
URL: cacheURL,
},
RemoteCommit1Endpoint: remoteC1URL,
RemoteCommit1Endpoint: remoteC1URL,
RemoteCommit2Endpoint: remoteC2URL,
RemoteSealingDoneEndpoint: doneURL,
}
if tc.mutateRemoteMeta != nil {
@ -218,6 +231,13 @@ func TestSectorImport(t *testing.T) {
require.Equal(t, snum, ng[0])
miner.WaitSectorsProvingAllowFails(ctx, map[abi.SectorNumber]struct{}{snum: {}}, map[api.SectorState]struct{}{api.SectorState(sealing.RemoteCommitFailed): {}})
require.NotNil(t, *doneResp)
require.Equal(t, tc.expectDoneSuccess, (*doneResp).Successful)
require.Equal(t, tc.expectDoneState, (*doneResp).State)
if tc.expectDoneSuccess {
require.NotNil(t, (*doneResp).CommitMessage)
}
}
}
@ -229,7 +249,7 @@ func TestSectorImport(t *testing.T) {
sealing.MinRetryTime = prt
})
testCase.c1handler = func(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) {
testCase.c1handler = func(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) {
var failedOnce bool
return func(w http.ResponseWriter, r *http.Request) {
@ -244,6 +264,35 @@ func TestSectorImport(t *testing.T) {
}
})))
t.Run("c1-fail-remove", runTest(makeTest(func(testCase *testCase) {
prt := sealing.MinRetryTime
sealing.MinRetryTime = time.Second
t.Cleanup(func() {
sealing.MinRetryTime = prt
})
testCase.c1handler = func(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, err := storiface.ParseSectorID(vars["id"])
if err != nil {
panic(err)
}
err = m.SectorRemove(r.Context(), id.Number)
if err != nil {
panic(err)
}
w.WriteHeader(http.StatusBadGateway)
}
}
testCase.expectDoneSuccess = false
testCase.expectDoneState = "Removing"
})))
t.Run("nil-commd", runTest(makeTest(func(testCase *testCase) {
testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) {
meta.CommD = nil
@ -302,8 +351,19 @@ func TestSectorImport(t *testing.T) {
// note: stuff below is almost the same as in _simple version of this file; We need
// to copy it because on Circle we can't call those functions between test files,
// and for the _simple test we want everything in one file to make it easy to follow
func testRemoteDone(rs **api.RemoteSealingDoneParams) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
*rs = new(api.RemoteSealingDoneParams)
if err := json.NewDecoder(r.Body).Decode(*rs); err != nil {
w.WriteHeader(500)
return
}
func testRemoteCommit1(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}
}
func testRemoteCommit1(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)

View File

@ -129,10 +129,14 @@ func TestSectorImportAfterPC2(t *testing.T) {
////////
// start http server serving sector data
doneResp := new(*api.RemoteSealingDoneParams)
m := mux.NewRouter()
m.HandleFunc("/sectors/{type}/{id}", remoteGetSector(sectorDir)).Methods("GET")
m.HandleFunc("/sectors/{id}/commit1", remoteCommit1(sealer)).Methods("POST")
m.HandleFunc("/sectors/{id}/sealed", remoteDone(doneResp)).Methods("POST")
m.HandleFunc("/commit2", remoteCommit2(sealer)).Methods("POST")
srv := httptest.NewServer(m)
unsealedURL := fmt.Sprintf("%s/sectors/unsealed/s-t0%d-%d", srv.URL, mid, snum)
@ -140,6 +144,7 @@ func TestSectorImportAfterPC2(t *testing.T) {
cacheURL := fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum)
remoteC1URL := fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum)
remoteC2URL := fmt.Sprintf("%s/commit2", srv.URL)
doneURL := fmt.Sprintf("%s/sectors/s-t0%d-%d/sealed", srv.URL, mid, snum)
////////
// import the sector and continue sealing
@ -177,8 +182,9 @@ func TestSectorImportAfterPC2(t *testing.T) {
URL: cacheURL,
},
RemoteCommit1Endpoint: remoteC1URL,
RemoteCommit2Endpoint: remoteC2URL,
RemoteCommit1Endpoint: remoteC1URL,
RemoteCommit2Endpoint: remoteC2URL,
RemoteSealingDoneEndpoint: doneURL,
})
require.NoError(t, err)
@ -189,6 +195,11 @@ func TestSectorImportAfterPC2(t *testing.T) {
require.Equal(t, snum, ng[0])
miner.WaitSectorsProving(ctx, map[abi.SectorNumber]struct{}{snum: {}})
require.NotNil(t, *doneResp)
require.True(t, (*doneResp).Successful)
require.Equal(t, "Proving", (*doneResp).State)
require.NotNil(t, (*doneResp).CommitMessage)
}
func remoteCommit1(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) {
@ -239,6 +250,18 @@ func remoteCommit1(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Req
}
}
func remoteDone(rs **api.RemoteSealingDoneParams) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
*rs = new(api.RemoteSealingDoneParams)
if err := json.NewDecoder(r.Body).Decode(*rs); err != nil {
w.WriteHeader(500)
return
}
w.WriteHeader(200)
}
}
func remoteCommit2(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
var params api.RemoteCommit2Params

View File

@ -31,7 +31,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
cw := cbg.NewCborWriter(w)
if _, err := cw.Write([]byte{184, 38}); err != nil {
if _, err := cw.Write([]byte{184, 39}); err != nil {
return err
}
@ -749,6 +749,29 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
return err
}
// t.RemoteSealingDoneEndpoint (string) (string)
if len("RemoteSealingDoneEndpoint") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RemoteSealingDoneEndpoint\" was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("RemoteSealingDoneEndpoint"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("RemoteSealingDoneEndpoint")); err != nil {
return err
}
if len(t.RemoteSealingDoneEndpoint) > cbg.MaxLength {
return xerrors.Errorf("Value in field t.RemoteSealingDoneEndpoint was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.RemoteSealingDoneEndpoint))); err != nil {
return err
}
if _, err := io.WriteString(w, string(t.RemoteSealingDoneEndpoint)); err != nil {
return err
}
// t.RemoteDataFinalized (bool) (bool)
if len("RemoteDataFinalized") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RemoteDataFinalized\" was too long")
@ -1547,6 +1570,17 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) {
t.RemoteCommit2Endpoint = string(sval)
}
// t.RemoteSealingDoneEndpoint (string) (string)
case "RemoteSealingDoneEndpoint":
{
sval, err := cbg.ReadString(cr)
if err != nil {
return err
}
t.RemoteSealingDoneEndpoint = string(sval)
}
// t.RemoteDataFinalized (bool) (bool)
case "RemoteDataFinalized":

View File

@ -7,6 +7,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"reflect"
"time"
@ -14,6 +15,8 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/lotus/api"
)
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
@ -142,8 +145,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
),
FinalizeSector: planOne(
on(SectorFinalized{}, Proving),
on(SectorFinalizedAvailable{}, Available),
onWithCB(SectorFinalized{}, Proving, maybeNotifyRemoteDone(true, "Proving")),
onWithCB(SectorFinalizedAvailable{}, Available, maybeNotifyRemoteDone(true, "Available")),
on(SectorFinalizeFailed{}, FinalizeFailed),
),
@ -218,7 +221,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryWaitSeed{}, WaitSeed),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorDealsExpired{}, DealsExpired),
onWithCB(SectorDealsExpired{}, DealsExpired, maybeNotifyRemoteDone(false, "DealsExpired")),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
),
ComputeProofFailed: planOne(
@ -241,9 +244,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorRetryPreCommit{}, PreCommitting),
on(SectorRetryCommitWait{}, CommitWait),
on(SectorRetrySubmitCommit{}, SubmitCommit),
on(SectorDealsExpired{}, DealsExpired),
onWithCB(SectorDealsExpired{}, DealsExpired, maybeNotifyRemoteDone(false, "DealsExpired")),
on(SectorInvalidDealIDs{}, RecoverDealIDs),
on(SectorTicketExpired{}, Removing),
onWithCB(SectorTicketExpired{}, Removing, maybeNotifyRemoteDone(false, "Removing")),
),
FinalizeFailed: planOne(
on(SectorRetryFinalize{}, FinalizeSector),
@ -736,6 +739,16 @@ func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) (bool,
}
}
func onWithCB(mut mutator, next SectorState, cb func(info *SectorInfo)) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
return mut, func(state *SectorInfo) (bool, error) {
cb(state)
state.State = next
return false, nil
}
}
}
// like `on`, but doesn't change state
func apply(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) {
return func() (mutator, func(*SectorInfo) (bool, error)) {
@ -812,3 +825,41 @@ func planOneOrIgnore(ts ...func() (mut mutator, next func(*SectorInfo) (more boo
return cnt, nil
}
}
func maybeNotifyRemoteDone(success bool, state string) func(*SectorInfo) {
return func(sector *SectorInfo) {
if sector.RemoteSealingDoneEndpoint == "" {
return
}
reqData := api.RemoteSealingDoneParams{
Successful: success,
State: state,
CommitMessage: sector.CommitMessage,
}
reqBody, err := json.Marshal(&reqData)
if err != nil {
log.Errorf("marshaling remote done notification request params: %s", err)
return
}
req, err := http.NewRequest("POST", sector.RemoteSealingDoneEndpoint, bytes.NewReader(reqBody))
if err != nil {
log.Errorf("creating new remote done notification request: %s", err)
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Errorf("sending remote done notification: %s", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Errorf("remote done notification received non-200 http response %s", resp.Status)
return
}
}
}

View File

@ -524,6 +524,9 @@ func (evt SectorTerminateFailed) apply(*SectorInfo) {}
type SectorRemove struct{}
func (evt SectorRemove) applyGlobal(state *SectorInfo) bool {
// because this event is global we need to send the notification here instead through an fsm callback
maybeNotifyRemoteDone(false, "Removing")(state)
state.State = Removing
return true
}

View File

@ -211,6 +211,15 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta
info.Pieces = meta.Pieces
info.SectorType = meta.Type
if meta.RemoteSealingDoneEndpoint != "" {
// validate the url
if _, err := url.Parse(meta.RemoteSealingDoneEndpoint); err != nil {
return SectorInfo{}, xerrors.Errorf("parsing remote sealing-done endpoint url: %w", err)
}
info.RemoteSealingDoneEndpoint = meta.RemoteSealingDoneEndpoint
}
if err := checkPieces(ctx, m.maddr, meta.Sector.Number, meta.Pieces, m.Api, false); err != nil {
return SectorInfo{}, xerrors.Errorf("checking pieces: %w", err)
}

View File

@ -94,12 +94,13 @@ type SectorInfo struct {
TerminatedAt abi.ChainEpoch
// Remote import
RemoteDataUnsealed *storiface.SectorData
RemoteDataSealed *storiface.SectorData
RemoteDataCache *storiface.SectorData
RemoteCommit1Endpoint string
RemoteCommit2Endpoint string
RemoteDataFinalized bool
RemoteDataUnsealed *storiface.SectorData
RemoteDataSealed *storiface.SectorData
RemoteDataCache *storiface.SectorData
RemoteCommit1Endpoint string
RemoteCommit2Endpoint string
RemoteSealingDoneEndpoint string
RemoteDataFinalized bool
// Debug
LastErr string