diff --git a/extern/sector-storage/cbor_gen.go b/extern/sector-storage/cbor_gen.go index 7ec29c795..a291487f0 100644 --- a/extern/sector-storage/cbor_gen.go +++ b/extern/sector-storage/cbor_gen.go @@ -17,12 +17,51 @@ func (t *Call) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{162}); err != nil { + if _, err := w.Write([]byte{164}); err != nil { return err } scratch := make([]byte, 9) + // t.ID (storiface.CallID) (struct) + if len("ID") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"ID\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("ID"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("ID")); err != nil { + return err + } + + if err := t.ID.MarshalCBOR(w); err != nil { + return err + } + + // t.RetType (sectorstorage.ReturnType) (string) + if len("RetType") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"RetType\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("RetType"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("RetType")); err != nil { + return err + } + + if len(t.RetType) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.RetType was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.RetType))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.RetType)); err != nil { + return err + } + // t.State (sectorstorage.CallState) (uint64) if len("State") > cbg.MaxLength { return xerrors.Errorf("Value in field \"State\" was too long") @@ -98,7 +137,28 @@ func (t *Call) UnmarshalCBOR(r io.Reader) error { } switch name { - // t.State (sectorstorage.CallState) (uint64) + // t.ID (storiface.CallID) (struct) + case "ID": + + { + + if err := t.ID.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.ID: %w", err) + } + + } + // t.RetType (sectorstorage.ReturnType) (string) + case "RetType": + + { + sval, err := cbg.ReadStringBuf(br, scratch) + if err != nil { + return err + } + + t.RetType = ReturnType(sval) + } + // t.State (sectorstorage.CallState) (uint64) case "State": { diff --git a/extern/sector-storage/manager_test.go b/extern/sector-storage/manager_test.go index 8ddfd822e..9a47c3b55 100644 --- a/extern/sector-storage/manager_test.go +++ b/extern/sector-storage/manager_test.go @@ -11,6 +11,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/google/uuid" "github.com/ipfs/go-datastore" @@ -203,6 +204,7 @@ func TestRedoPC1(t *testing.T) { require.Equal(t, 2, tw.pc1s) } +// Manager restarts in the middle of a task, restarts it, it completes func TestRestartManager(t *testing.T) { logging.SetAllLoggers(logging.LevelDebug) @@ -262,6 +264,8 @@ func TestRestartManager(t *testing.T) { m, lstor, _, _ = newTestMgr(ctx, t, ds) tw.ret = m // simulate jsonrpc auto-reconnect + err = m.AddWorker(ctx, tw) + require.NoError(t, err) tw.pc1lk.Unlock() @@ -270,3 +274,68 @@ func TestRestartManager(t *testing.T) { require.Equal(t, 1, tw.pc1s) } + +// Worker restarts in the middle of a task, task fails after restart +func TestRestartWorker(t *testing.T) { + logging.SetAllLoggers(logging.LevelDebug) + + ctx, done := context.WithCancel(context.Background()) + defer done() + + ds := datastore.NewMapDatastore() + + m, lstor, stor, idx := newTestMgr(ctx, t, ds) + + localTasks := []sealtasks.TaskType{ + sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch, + } + + wds := datastore.NewMapDatastore() + + arch := make(chan chan apres) + w := newLocalWorker(func() (ffiwrapper.Storage, error) { + return &testExec{apch: arch}, nil + }, WorkerConfig{ + SealProof: 0, + TaskTypes: localTasks, + }, stor, lstor, idx, m, statestore.New(wds)) + + err := m.AddWorker(ctx, w) + require.NoError(t, err) + + sid := abi.SectorID{Miner: 1000, Number: 1} + + apDone := make(chan struct{}) + + go func() { + defer close(apDone) + + _, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127))) + require.Error(t, err) + }() + + // kill the worker + <-arch + require.NoError(t, w.Close()) + + for { + if len(m.WorkerStats()) == 0 { + break + } + + time.Sleep(time.Millisecond * 3) + } + + // restart the worker + w = newLocalWorker(func() (ffiwrapper.Storage, error) { + return &testExec{apch: arch}, nil + }, WorkerConfig{ + SealProof: 0, + TaskTypes: localTasks, + }, stor, lstor, idx, m, statestore.New(wds)) + + err = m.AddWorker(ctx, w) + require.NoError(t, err) + + <-apDone +} diff --git a/extern/sector-storage/teststorage_test.go b/extern/sector-storage/teststorage_test.go new file mode 100644 index 000000000..da575a491 --- /dev/null +++ b/extern/sector-storage/teststorage_test.go @@ -0,0 +1,81 @@ +package sectorstorage + +import ( + "context" + "io" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/specs-actors/actors/runtime/proof" + "github.com/filecoin-project/specs-storage/storage" + + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" +) + +type apres struct { + pi abi.PieceInfo + err error +} + +type testExec struct { + apch chan chan apres +} + +func (t *testExec) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) { + panic("implement me") +} + +func (t *testExec) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.SectorInfo, randomness abi.PoStRandomness) (proof []proof.PoStProof, skipped []abi.SectorID, err error) { + panic("implement me") +} + +func (t *testExec) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) { + panic("implement me") +} + +func (t *testExec) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) { + panic("implement me") +} + +func (t *testExec) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) { + panic("implement me") +} + +func (t *testExec) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) { + panic("implement me") +} + +func (t *testExec) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { + panic("implement me") +} + +func (t *testExec) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error { + panic("implement me") +} + +func (t *testExec) Remove(ctx context.Context, sector abi.SectorID) error { + panic("implement me") +} + +func (t *testExec) NewSector(ctx context.Context, sector abi.SectorID) error { + panic("implement me") +} + +func (t *testExec) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { + resp := make(chan apres) + t.apch <- resp + ar := <-resp + return ar.pi, ar.err +} + +func (t *testExec) UnsealPiece(ctx context.Context, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error { + panic("implement me") +} + +func (t *testExec) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { + panic("implement me") +} + +var _ ffiwrapper.Storage = &testExec{} \ No newline at end of file diff --git a/extern/sector-storage/worker_calltracker.go b/extern/sector-storage/worker_calltracker.go index a16ee33be..38fb39ee1 100644 --- a/extern/sector-storage/worker_calltracker.go +++ b/extern/sector-storage/worker_calltracker.go @@ -19,13 +19,18 @@ const ( ) type Call struct { + ID storiface.CallID + RetType ReturnType + State CallState Result []byte // json bytes } -func (wt *workerCallTracker) onStart(ci storiface.CallID) error { +func (wt *workerCallTracker) onStart(ci storiface.CallID, rt ReturnType) error { return wt.st.Begin(ci, &Call{ + ID: ci, + RetType:rt, State: CallStarted, }) } @@ -43,3 +48,8 @@ func (wt *workerCallTracker) onReturned(ci storiface.CallID) error { st := wt.st.Get(ci) return st.End() } + +func (wt *workerCallTracker) unfinished() ([]Call, error) { + var out []Call + return out, wt.st.List(&out) +} diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 210ea340c..009e11921 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -38,18 +38,21 @@ type LocalWorker struct { localStore *stores.Local sindex stores.SectorIndex ret storiface.WorkerReturn + executor func() (ffiwrapper.Storage, error) ct *workerCallTracker acceptTasks map[sealtasks.TaskType]struct{} + + closing chan struct{} } -func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { +func newLocalWorker(executor func() (ffiwrapper.Storage, error), wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { acceptTasks := map[sealtasks.TaskType]struct{}{} for _, taskType := range wcfg.TaskTypes { acceptTasks[taskType] = struct{}{} } - return &LocalWorker{ + w := &LocalWorker{ scfg: &ffiwrapper.Config{ SealProofType: wcfg.SealProof, }, @@ -62,7 +65,37 @@ func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, st: cst, }, acceptTasks: acceptTasks, + executor: executor, + + closing: make(chan struct{}), } + + if w.executor == nil { + w.executor = w.ffiExec + } + + unfinished, err := w.ct.unfinished() + if err != nil { + log.Errorf("reading unfinished tasks: %+v", err) + return w + } + + go func() { + for _, call := range unfinished { + err := xerrors.Errorf("worker restarted") + + if err := returnFunc[call.RetType](context.TODO(), call.ID, ret, nil, err); err != nil { + log.Errorf("return error: %s: %+v", call.RetType, err) + } + } + }() + + + return w +} + +func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker { + return newLocalWorker(nil, wcfg, store, local, sindex, ret, cst) } type localWorkerPathProvider struct { @@ -101,11 +134,11 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi. }, nil } -func (l *LocalWorker) sb() (ffiwrapper.Storage, error) { +func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) { return ffiwrapper.New(&localWorkerPathProvider{w: l}, l.scfg) } -type returnType string +type ReturnType string // in: func(WorkerReturn, context.Context, CallID, err string) // in: func(WorkerReturn, context.Context, CallID, ret T, err string) @@ -123,7 +156,12 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor var ro []reflect.Value if withRet { - ro = rf.Call([]reflect.Value{rwr, rctx, rci, reflect.ValueOf(i), rerr}) + ret := reflect.ValueOf(i) + if i == nil { + ret = reflect.Zero(rf.Type().In(3)) + } + + ro = rf.Call([]reflect.Value{rwr, rctx, rci, ret, rerr}) } else { ro = rf.Call([]reflect.Value{rwr, rctx, rci, rerr}) } @@ -136,7 +174,7 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor } } -var returnFunc = map[returnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, error) error{ +var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, error) error{ "AddPiece": rfunc(storiface.WorkerReturn.ReturnAddPiece), "SealPreCommit1": rfunc(storiface.WorkerReturn.ReturnSealPreCommit1), "SealPreCommit2": rfunc(storiface.WorkerReturn.ReturnSealPreCommit2), @@ -150,13 +188,13 @@ var returnFunc = map[returnType]func(context.Context, storiface.CallID, storifac "Fetch": rfunc(storiface.WorkerReturn.ReturnFetch), } -func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt returnType, work func(ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) { +func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt ReturnType, work func(ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) { ci := storiface.CallID{ Sector: sector, ID: uuid.New(), } - if err := l.ct.onStart(ci); err != nil { + if err := l.ct.onStart(ci, rt); err != nil { log.Errorf("tracking call (start): %+v", err) } @@ -196,7 +234,7 @@ func errstr(err error) string { } func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error { - sb, err := l.sb() + sb, err := l.executor() if err != nil { return err } @@ -205,7 +243,7 @@ func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error } func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) { - sb, err := l.sb() + sb, err := l.executor() if err != nil { return storiface.UndefCall, err } @@ -240,7 +278,7 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, t } } - sb, err := l.sb() + sb, err := l.executor() if err != nil { return nil, err } @@ -250,7 +288,7 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, t } func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (storiface.CallID, error) { - sb, err := l.sb() + sb, err := l.executor() if err != nil { return storiface.UndefCall, err } @@ -261,7 +299,7 @@ func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, p } func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (storiface.CallID, error) { - sb, err := l.sb() + sb, err := l.executor() if err != nil { return storiface.UndefCall, err } @@ -272,7 +310,7 @@ func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, tick } func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (storiface.CallID, error) { - sb, err := l.sb() + sb, err := l.executor() if err != nil { return storiface.UndefCall, err } @@ -283,7 +321,7 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phas } func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage2.Range) (storiface.CallID, error) { - sb, err := l.sb() + sb, err := l.executor() if err != nil { return storiface.UndefCall, err } @@ -330,7 +368,7 @@ func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, type } func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { - sb, err := l.sb() + sb, err := l.executor() if err != nil { return storiface.UndefCall, err } @@ -353,7 +391,7 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde } func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) { - sb, err := l.sb() + sb, err := l.executor() if err != nil { return storiface.UndefCall, err } @@ -405,10 +443,11 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) { } func (l *LocalWorker) Closing(ctx context.Context) (<-chan struct{}, error) { - return make(chan struct{}), nil + return l.closing, nil } func (l *LocalWorker) Close() error { + close(l.closing) return nil } diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index af76861c1..80bab7868 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/filecoin-project/go-statestore" "net/http" "time" @@ -43,6 +42,7 @@ import ( "github.com/filecoin-project/go-multistore" paramfetch "github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-storedcounter" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" @@ -50,15 +50,15 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/stores" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" - "github.com/filecoin-project/lotus/journal" - "github.com/filecoin-project/lotus/markets" lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/blockstore" + "github.com/filecoin-project/lotus/markets" marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/markets/retrievaladapter" "github.com/filecoin-project/lotus/miner"