diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 462a54985..c7bbf09a7 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -444,6 +444,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, } wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix)) + smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix)) smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), &ffiwrapper.Config{ SealProofType: spt, @@ -454,7 +455,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, AllowPreCommit2: true, AllowCommit: true, AllowUnseal: true, - }, nil, sa, wsts) + }, nil, sa, wsts, smsts) if err != nil { return err } diff --git a/extern/sector-storage/cbor_gen.go b/extern/sector-storage/cbor_gen.go index c20df2157..137e32650 100644 --- a/extern/sector-storage/cbor_gen.go +++ b/extern/sector-storage/cbor_gen.go @@ -143,3 +143,151 @@ func (t *Call) UnmarshalCBOR(r io.Reader) error { return nil } +func (t *WorkState) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{163}); err != nil { + return err + } + + scratch := make([]byte, 9) + + // t.Status (sectorstorage.WorkStatus) (string) + if len("Status") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Status\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Status"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("Status")); err != nil { + return err + } + + if len(t.Status) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Status was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.Status))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.Status)); err != nil { + return err + } + + // t.WorkerCall (storiface.CallID) (struct) + if len("WorkerCall") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"WorkerCall\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("WorkerCall"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("WorkerCall")); err != nil { + return err + } + + if err := t.WorkerCall.MarshalCBOR(w); err != nil { + return err + } + + // t.WorkError (string) (string) + if len("WorkError") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"WorkError\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("WorkError"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("WorkError")); err != nil { + return err + } + + if len(t.WorkError) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.WorkError was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.WorkError))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.WorkError)); err != nil { + return err + } + return nil +} + +func (t *WorkState) UnmarshalCBOR(r io.Reader) error { + *t = WorkState{} + + br := cbg.GetPeeker(r) + scratch := make([]byte, 8) + + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajMap { + return fmt.Errorf("cbor input should be of type map") + } + + if extra > cbg.MaxLength { + return fmt.Errorf("WorkState: map struct too large (%d)", extra) + } + + var name string + n := extra + + for i := uint64(0); i < n; i++ { + + { + sval, err := cbg.ReadStringBuf(br, scratch) + if err != nil { + return err + } + + name = string(sval) + } + + switch name { + // t.Status (sectorstorage.WorkStatus) (string) + case "Status": + + { + sval, err := cbg.ReadStringBuf(br, scratch) + if err != nil { + return err + } + + t.Status = WorkStatus(sval) + } + // t.WorkerCall (storiface.CallID) (struct) + case "WorkerCall": + + { + + if err := t.WorkerCall.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.WorkerCall: %w", err) + } + + } + // t.WorkError (string) (string) + case "WorkError": + + { + sval, err := cbg.ReadStringBuf(br, scratch) + if err != nil { + return err + } + + t.WorkError = string(sval) + } + + default: + return fmt.Errorf("unknown struct field %d: '%s'", i, name) + } + } + + return nil +} diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 14ec875e9..a3f04037d 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -130,11 +130,11 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg Prover: prover, - work: mss, - workWait: map[workID]*sync.Cond{}, + work: mss, callToWork: map[storiface.CallID]workID{}, - results: map[workID]result{}, - waitRes: map[workID]chan struct{}{}, + callRes: map[storiface.CallID]chan result{}, + results: map[workID]result{}, + waitRes: map[workID]chan struct{}{}, } // TODO: remove all non-running work from the work tracker @@ -223,14 +223,14 @@ func schedNop(context.Context, Worker) error { func (m *Manager) schedFetch(sector abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error { return func(ctx context.Context, worker Worker) error { - _, err := m.startWork(ctx)(worker.Fetch(ctx, sector, ft, ptype, am)) + _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am)) return err } } func (m *Manager) readPiece(sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error { return func(ctx context.Context, w Worker) error { - r, err := m.startWork(ctx)(w.ReadPiece(ctx, sink, sector, offset, size)) + r, err := m.waitSimpleCall(ctx)(w.ReadPiece(ctx, sink, sector, offset, size)) if err != nil { return err } @@ -263,7 +263,7 @@ func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sect selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false) - err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(m.startWork, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), + err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), m.readPiece(sink, sector, offset, size, &readOk)) if err != nil { returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err) @@ -290,12 +290,12 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect } unsealFetch := func(ctx context.Context, worker Worker) error { - if _, err := m.startWork(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil { + if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil { return xerrors.Errorf("copy sealed/cache sector data: %w", err) } if foundUnsealed { - if _, err := m.startWork(ctx)(worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove)); err != nil { + if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove)); err != nil { return xerrors.Errorf("copy unsealed sector data: %w", err) } } @@ -306,7 +306,8 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect return xerrors.Errorf("cannot unseal piece (sector: %d, offset: %d size: %d) - unsealed cid is undefined", sector, offset, size) } err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error { - _, err := m.startWork(ctx)(w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed)) + // TODO: make restartable + _, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed)) return err }) if err != nil { @@ -315,7 +316,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false) - err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(m.startWork, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), + err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), m.readPiece(sink, sector, offset, size, &readOk)) if err != nil { return xerrors.Errorf("reading piece from sealed sector: %w", err) @@ -351,7 +352,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie var out abi.PieceInfo err = m.sched.Schedule(ctx, sector, sealtasks.TTAddPiece, selector, schedNop, func(ctx context.Context, w Worker) error { - p, err := m.startWork(ctx)(w.AddPiece(ctx, sector, existingPieces, sz, r)) + p, err := m.waitSimpleCall(ctx)(w.AddPiece(ctx, sector, existingPieces, sz, r)) if err != nil { return err } @@ -398,8 +399,8 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke if err != nil { return err } - waitRes() + waitRes() return nil }) @@ -410,18 +411,38 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase ctx, cancel := context.WithCancel(ctx) defer cancel() + wk, wait, err := m.getWork(ctx, "PreCommit2", sector, phase1Out) + if err != nil { + return storage.SectorCids{}, xerrors.Errorf("getWork: %w", err) + } + + waitRes := func() { + p, werr := m.waitWork(ctx, wk) + if werr != nil { + err = werr + return + } + out = p.(storage.SectorCids) + } + + if wait { // already in progress + waitRes() + return + } + if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil { return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err) } selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true) - err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(m.startWork, sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { - p, err := m.startWork(ctx)(w.SealPreCommit2(ctx, sector, phase1Out)) + err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { + err := m.startWork(ctx, wk)(w.SealPreCommit2(ctx, sector, phase1Out)) if err != nil { return err } - out = p.(storage.SectorCids) + + waitRes() return nil }) return out, err @@ -431,6 +452,25 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a ctx, cancel := context.WithCancel(ctx) defer cancel() + wk, wait, err := m.getWork(ctx, "Commit1", sector, ticket, seed, pieces, cids) + if err != nil { + return storage.Commit1Out{}, xerrors.Errorf("getWork: %w", err) + } + + waitRes := func() { + p, werr := m.waitWork(ctx, wk) + if werr != nil { + err = werr + return + } + out = p.(storage.Commit1Out) + } + + if wait { // already in progress + waitRes() + return + } + if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil { return storage.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err) } @@ -440,26 +480,47 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a // generally very cheap / fast, and transferring data is not worth the effort selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false) - err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(m.startWork, sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { - p, err := m.startWork(ctx)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) + err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { + err := m.startWork(ctx, wk)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) if err != nil { return err } - out = p.(storage.Commit1Out) + + waitRes() return nil }) return out, err } func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) { + wk, wait, err := m.getWork(ctx, "Commit2", sector, phase1Out) + if err != nil { + return storage.Proof{}, xerrors.Errorf("getWork: %w", err) + } + + waitRes := func() { + p, werr := m.waitWork(ctx, wk) + if werr != nil { + err = werr + return + } + out = p.(storage.Proof) + } + + if wait { // already in progress + waitRes() + return + } + selector := newTaskSelector() err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error { - p, err := m.startWork(ctx)(w.SealCommit2(ctx, sector, phase1Out)) + err := m.startWork(ctx, wk)(w.SealCommit2(ctx, sector, phase1Out)) if err != nil { return err } - out = p.(storage.Proof) + + waitRes() return nil }) @@ -489,9 +550,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false) err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, - schedFetch(m.startWork, sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove), + m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { - _, err := m.startWork(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed)) + _, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed)) return err }) if err != nil { @@ -507,9 +568,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU } err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, - schedFetch(m.startWork, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove), + m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove), func(ctx context.Context, w Worker) error { - _, err := m.startWork(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed)) + _, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed)) return err }) if err != nil { diff --git a/extern/sector-storage/manager_calltracker.go b/extern/sector-storage/manager_calltracker.go index d209cc1f0..8092f514a 100644 --- a/extern/sector-storage/manager_calltracker.go +++ b/extern/sector-storage/manager_calltracker.go @@ -2,11 +2,11 @@ package sectorstorage import ( "context" + "crypto/sha256" "encoding/json" "errors" "fmt" "golang.org/x/xerrors" - "io" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) @@ -16,7 +16,7 @@ type workID struct { Params string // json [...params] } -func (w *workID) String() string { +func (w workID) String() string { return fmt.Sprintf("%s(%s)", w.Method, w.Params) } @@ -36,16 +36,17 @@ type WorkState struct { WorkError string // Status = wsDone, set when failed to start work } -func (w *WorkState) UnmarshalCBOR(reader io.Reader) error { - panic("implement me") -} - func newWorkID(method string, params ...interface{}) (workID, error) { pb, err := json.Marshal(params) if err != nil { return workID{}, xerrors.Errorf("marshaling work params: %w", err) } + if len(pb) > 256 { + s := sha256.Sum256(pb) + pb = s[:] + } + return workID{ Method: method, Params: string(pb), @@ -68,7 +69,7 @@ func (m *Manager) getWork(ctx context.Context, method string, params ...interfac } if !have { - err := m.work.Begin(wid, WorkState{ + err := m.work.Begin(wid, &WorkState{ Status: wsStarted, }) if err != nil { @@ -194,6 +195,16 @@ func (m *Manager) waitWork(ctx context.Context, wid workID) (interface{}, error) } } +func (m *Manager) waitSimpleCall(ctx context.Context) func(callID storiface.CallID, err error) (interface{}, error) { + return func(callID storiface.CallID, err error) (interface{}, error) { + if err != nil { + return nil, err + } + + return m.waitCall(ctx, callID) + } +} + func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interface{}, error) { m.workLk.Lock() _, ok := m.callToWork[callID] @@ -204,7 +215,7 @@ func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interf ch, ok := m.callRes[callID] if !ok { - ch = make(chan result) + ch = make(chan result, 1) m.callRes[callID] = ch } m.workLk.Unlock() diff --git a/extern/sector-storage/manager_test.go b/extern/sector-storage/manager_test.go index db32d655e..a4015c132 100644 --- a/extern/sector-storage/manager_test.go +++ b/extern/sector-storage/manager_test.go @@ -11,17 +11,19 @@ import ( "strings" "testing" + "github.com/google/uuid" + "github.com/ipfs/go-datastore" + logging "github.com/ipfs/go-log" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" - - "github.com/filecoin-project/go-state-types/abi" - - "github.com/google/uuid" - logging "github.com/ipfs/go-log" - "github.com/stretchr/testify/require" ) func init() { @@ -111,8 +113,11 @@ func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *st Prover: prover, - results: map[storiface.CallID]result{}, - waitRes: map[storiface.CallID]chan struct{}{}, + work: statestore.New(datastore.NewMapDatastore()), + callToWork: map[storiface.CallID]workID{}, + callRes: map[storiface.CallID]chan result{}, + results: map[workID]result{}, + waitRes: map[workID]chan struct{}{}, } go m.sched.runSched() diff --git a/extern/sector-storage/storiface/cbor_gen.go b/extern/sector-storage/storiface/cbor_gen.go new file mode 100644 index 000000000..0efbc125b --- /dev/null +++ b/extern/sector-storage/storiface/cbor_gen.go @@ -0,0 +1,142 @@ +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +package storiface + +import ( + "fmt" + "io" + + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +var _ = xerrors.Errorf + +func (t *CallID) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{162}); err != nil { + return err + } + + scratch := make([]byte, 9) + + // t.Sector (abi.SectorID) (struct) + if len("Sector") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"Sector\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Sector"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("Sector")); err != nil { + return err + } + + if err := t.Sector.MarshalCBOR(w); err != nil { + return err + } + + // t.ID (uuid.UUID) (array) + if len("ID") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"ID\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("ID"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("ID")); err != nil { + return err + } + + if len(t.ID) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.ID was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajByteString, uint64(len(t.ID))); err != nil { + return err + } + + if _, err := w.Write(t.ID[:]); err != nil { + return err + } + return nil +} + +func (t *CallID) UnmarshalCBOR(r io.Reader) error { + *t = CallID{} + + br := cbg.GetPeeker(r) + scratch := make([]byte, 8) + + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + if maj != cbg.MajMap { + return fmt.Errorf("cbor input should be of type map") + } + + if extra > cbg.MaxLength { + return fmt.Errorf("CallID: map struct too large (%d)", extra) + } + + var name string + n := extra + + for i := uint64(0); i < n; i++ { + + { + sval, err := cbg.ReadStringBuf(br, scratch) + if err != nil { + return err + } + + name = string(sval) + } + + switch name { + // t.Sector (abi.SectorID) (struct) + case "Sector": + + { + + if err := t.Sector.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.Sector: %w", err) + } + + } + // t.ID (uuid.UUID) (array) + case "ID": + + maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.ID: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + + if extra != 16 { + return fmt.Errorf("expected array to have 16 elements") + } + + t.ID = [16]uint8{} + + if _, err := io.ReadFull(br, t.ID[:]); err != nil { + return err + } + + default: + return fmt.Errorf("unknown struct field %d: '%s'", i, name) + } + } + + return nil +} diff --git a/gen/main.go b/gen/main.go index 95ace5583..c2adbb7a0 100644 --- a/gen/main.go +++ b/gen/main.go @@ -2,7 +2,6 @@ package main import ( "fmt" - sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "os" gen "github.com/whyrusleeping/cbor-gen" @@ -10,6 +9,8 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/types" + sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/paychmgr" ) @@ -75,6 +76,14 @@ func main() { os.Exit(1) } + err = gen.WriteMapEncodersToFile("./extern/sector-storage/storiface/cbor_gen.go", "storiface", + storiface.CallID{}, + ) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + err = gen.WriteMapEncodersToFile("./extern/sector-storage/cbor_gen.go", "sectorstorage", sectorstorage.Call{}, sectorstorage.WorkState{}, diff --git a/lotuspond/front/src/chain/methods.json b/lotuspond/front/src/chain/methods.json index ad1076c84..ce4919cc4 100644 --- a/lotuspond/front/src/chain/methods.json +++ b/lotuspond/front/src/chain/methods.json @@ -23,7 +23,8 @@ "AddSigner", "RemoveSigner", "SwapSigner", - "ChangeNumApprovalsThreshold" + "ChangeNumApprovalsThreshold", + "LockBalance" ], "fil/1/paymentchannel": [ "Send", diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index e73ac06c3..af76861c1 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -517,13 +517,15 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S } var WorkerCallsPrefix = datastore.NewKey("/worker/calls") +var ManagerWorkPrefix = datastore.NewKey("/stmgr/calls") func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) { ctx := helpers.LifecycleCtx(mctx, lc) wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix)) + smsts := statestore.New(namespace.Wrap(ds, ManagerWorkPrefix)) - sst, err := sectorstorage.New(ctx, ls, si, cfg, sc, urls, sa, wsts) + sst, err := sectorstorage.New(ctx, ls, si, cfg, sc, urls, sa, wsts, smsts) if err != nil { return nil, err }