From b1361aaf8bc6f3a6ea0bcfeab4f84493dbab9bec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 16 Sep 2020 17:08:05 +0200 Subject: [PATCH] sectorstorage: wip manager work tracker --- extern/sector-storage/manager.go | 148 ++++------ extern/sector-storage/manager_calltracker.go | 271 ++++++++++++++++++ .../{calltracker.go => worker_calltracker.go} | 8 +- .../{localworker.go => worker_local.go} | 4 +- .../{work_tracker.go => worker_tracked.go} | 0 gen/main.go | 1 + 6 files changed, 337 insertions(+), 95 deletions(-) create mode 100644 extern/sector-storage/manager_calltracker.go rename extern/sector-storage/{calltracker.go => worker_calltracker.go} (71%) rename extern/sector-storage/{localworker.go => worker_local.go} (99%) rename extern/sector-storage/{work_tracker.go => worker_tracked.go} (100%) diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 3b2c7e994..14ec875e9 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -72,9 +72,15 @@ type Manager struct { storage.Prover - resLk sync.Mutex - results map[storiface.CallID]result - waitRes map[storiface.CallID]chan struct{} + workLk sync.Mutex + work *statestore.StateStore + + callToWork map[storiface.CallID]workID + // used when we get an early return and there's no callToWork mapping + callRes map[storiface.CallID]chan result + + results map[workID]result + waitRes map[workID]chan struct{} } type result struct { @@ -96,8 +102,9 @@ type SealerConfig struct { type StorageAuth http.Header type WorkerStateStore *statestore.StateStore +type ManagerStateStore *statestore.StateStore -func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore) (*Manager, error) { +func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { lstor, err := stores.NewLocal(ctx, ls, si, urls) if err != nil { return nil, err @@ -123,10 +130,15 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg Prover: prover, - results: map[storiface.CallID]result{}, - waitRes: map[storiface.CallID]chan struct{}{}, + work: mss, + workWait: map[workID]*sync.Cond{}, + callToWork: map[storiface.CallID]workID{}, + results: map[workID]result{}, + waitRes: map[workID]chan struct{}{}, } + // TODO: remove all non-running work from the work tracker + go m.sched.runSched() localTasks := []sealtasks.TaskType{ @@ -209,16 +221,16 @@ func schedNop(context.Context, Worker) error { return nil } -func schedFetch(wf waitFunc, sector abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error { +func (m *Manager) schedFetch(sector abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error { return func(ctx context.Context, worker Worker) error { - _, err := wf(ctx)(worker.Fetch(ctx, sector, ft, ptype, am)) + _, err := m.startWork(ctx)(worker.Fetch(ctx, sector, ft, ptype, am)) return err } } func (m *Manager) readPiece(sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error { return func(ctx context.Context, w Worker) error { - r, err := m.waitResult(ctx)(w.ReadPiece(ctx, sink, sector, offset, size)) + r, err := m.startWork(ctx)(w.ReadPiece(ctx, sink, sector, offset, size)) if err != nil { return err } @@ -251,7 +263,7 @@ func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sect selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false) - err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(m.waitResult, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), + err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(m.startWork, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), m.readPiece(sink, sector, offset, size, &readOk)) if err != nil { returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err) @@ -278,12 +290,12 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect } unsealFetch := func(ctx context.Context, worker Worker) error { - if _, err := m.waitResult(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil { + if _, err := m.startWork(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil { return xerrors.Errorf("copy sealed/cache sector data: %w", err) } if foundUnsealed { - if _, err := m.waitResult(ctx)(worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove)); err != nil { + if _, err := m.startWork(ctx)(worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove)); err != nil { return xerrors.Errorf("copy unsealed sector data: %w", err) } } @@ -294,7 +306,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect return xerrors.Errorf("cannot unseal piece (sector: %d, offset: %d size: %d) - unsealed cid is undefined", sector, offset, size) } err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error { - _, err := m.waitResult(ctx)(w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed)) + _, err := m.startWork(ctx)(w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed)) return err }) if err != nil { @@ -303,7 +315,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false) - err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(m.waitResult, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), + err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(m.startWork, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), m.readPiece(sink, sector, offset, size, &readOk)) if err != nil { return xerrors.Errorf("reading piece from sealed sector: %w", err) @@ -339,7 +351,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie var out abi.PieceInfo err = m.sched.Schedule(ctx, sector, sealtasks.TTAddPiece, selector, schedNop, func(ctx context.Context, w Worker) error { - p, err := m.waitResult(ctx)(w.AddPiece(ctx, sector, existingPieces, sz, r)) + p, err := m.startWork(ctx)(w.AddPiece(ctx, sector, existingPieces, sz, r)) if err != nil { return err } @@ -354,6 +366,25 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke ctx, cancel := context.WithCancel(ctx) defer cancel() + wk, wait, err := m.getWork(ctx, "PreCommit1", sector, ticket, pieces) + if err != nil { + return nil, xerrors.Errorf("getWork: %w", err) + } + + waitRes := func() { + p, werr := m.waitWork(ctx, wk) + if werr != nil { + err = werr + return + } + out = p.(storage.PreCommit1Out) + } + + if wait { // already in progress + waitRes() + return + } + if err := m.index.StorageLock(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache); err != nil { return nil, xerrors.Errorf("acquiring sector lock: %w", err) } @@ -362,12 +393,13 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke selector := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathSealing) - err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(m.waitResult, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { - p, err := m.waitResult(ctx)(w.SealPreCommit1(ctx, sector, ticket, pieces)) + err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { + err := m.startWork(ctx, wk)(w.SealPreCommit1(ctx, sector, ticket, pieces)) if err != nil { return err } - out = p.(storage.PreCommit1Out) + waitRes() + return nil }) @@ -384,8 +416,8 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true) - err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(m.waitResult, sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { - p, err := m.waitResult(ctx)(w.SealPreCommit2(ctx, sector, phase1Out)) + err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(m.startWork, sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { + p, err := m.startWork(ctx)(w.SealPreCommit2(ctx, sector, phase1Out)) if err != nil { return err } @@ -408,8 +440,8 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a // generally very cheap / fast, and transferring data is not worth the effort selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false) - err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(m.waitResult, sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { - p, err := m.waitResult(ctx)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) + err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(m.startWork, sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { + p, err := m.startWork(ctx)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) if err != nil { return err } @@ -423,7 +455,7 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou selector := newTaskSelector() err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error { - p, err := m.waitResult(ctx)(w.SealCommit2(ctx, sector, phase1Out)) + p, err := m.startWork(ctx)(w.SealCommit2(ctx, sector, phase1Out)) if err != nil { return err } @@ -457,9 +489,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false) err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, - schedFetch(m.waitResult, sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove), + schedFetch(m.startWork, sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error { - _, err := m.waitResult(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed)) + _, err := m.startWork(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed)) return err }) if err != nil { @@ -475,9 +507,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU } err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, - schedFetch(m.waitResult, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove), + schedFetch(m.startWork, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove), func(ctx context.Context, w Worker) error { - _, err := m.waitResult(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed)) + _, err := m.startWork(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed)) return err }) if err != nil { @@ -515,68 +547,6 @@ func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error { return err } -type waitFunc func(ctx context.Context) func(callID storiface.CallID, err error) (interface{}, error) - -func (m *Manager) waitResult(ctx context.Context) func(callID storiface.CallID, err error) (interface{}, error) { - return func(callID storiface.CallID, err error) (interface{}, error) { - if err != nil { - return nil, err - } - - m.resLk.Lock() - res, ok := m.results[callID] - if ok { - m.resLk.Unlock() - return res.r, res.err - } - - ch, ok := m.waitRes[callID] - if !ok { - ch = make(chan struct{}) - m.waitRes[callID] = ch - } - m.resLk.Unlock() - - select { - case <-ch: - m.resLk.Lock() - defer m.resLk.Unlock() - - res := m.results[callID] - delete(m.results, callID) - - return res.r, res.err - case <-ctx.Done(): - return nil, xerrors.Errorf("waiting for result: %w", ctx.Err()) - } - } -} - -func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr string) error { - m.resLk.Lock() - defer m.resLk.Unlock() - - _, ok := m.results[callID] - if ok { - return xerrors.Errorf("result for call %v already reported") - } - - var err error - if serr != "" { - err = errors.New(serr) - } - - m.results[callID] = result{ - r: r, - err: err, - } - - close(m.waitRes[callID]) - delete(m.waitRes, callID) - - return nil -} - func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error { return m.returnResult(callID, pi, err) } diff --git a/extern/sector-storage/manager_calltracker.go b/extern/sector-storage/manager_calltracker.go new file mode 100644 index 000000000..d209cc1f0 --- /dev/null +++ b/extern/sector-storage/manager_calltracker.go @@ -0,0 +1,271 @@ +package sectorstorage + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "golang.org/x/xerrors" + "io" + + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" +) + +type workID struct { + Method string + Params string // json [...params] +} + +func (w *workID) String() string { + return fmt.Sprintf("%s(%s)", w.Method, w.Params) +} + +var _ fmt.Stringer = &workID{} + +type WorkStatus string +const ( + wsStarted WorkStatus = "started" // task started, not scheduled/running on a worker yet + wsRunning WorkStatus = "running" // task running on a worker, waiting for worker return + wsDone WorkStatus = "done" // task returned from the worker, results available +) + +type WorkState struct { + Status WorkStatus + + WorkerCall storiface.CallID // Set when entering wsRunning + WorkError string // Status = wsDone, set when failed to start work +} + +func (w *WorkState) UnmarshalCBOR(reader io.Reader) error { + panic("implement me") +} + +func newWorkID(method string, params ...interface{}) (workID, error) { + pb, err := json.Marshal(params) + if err != nil { + return workID{}, xerrors.Errorf("marshaling work params: %w", err) + } + + return workID{ + Method: method, + Params: string(pb), + }, nil +} + +// returns wait=true when the task is already tracked/running +func (m *Manager) getWork(ctx context.Context, method string, params ...interface{}) (wid workID, wait bool, err error) { + wid, err = newWorkID(method, params) + if err != nil { + return workID{}, false, xerrors.Errorf("creating workID: %w", err) + } + + m.workLk.Lock() + defer m.workLk.Unlock() + + have, err := m.work.Has(wid) + if err != nil { + return workID{}, false, xerrors.Errorf("failed to check if the task is already tracked: %w", err) + } + + if !have { + err := m.work.Begin(wid, WorkState{ + Status: wsStarted, + }) + if err != nil { + return workID{}, false, xerrors.Errorf("failed to track task start: %w", err) + } + + return wid, false, nil + } + + // already started + + return wid, true, nil +} + +func (m *Manager) startWork(ctx context.Context, wk workID) func(callID storiface.CallID, err error) error { + return func(callID storiface.CallID, err error) error { + m.workLk.Lock() + defer m.workLk.Unlock() + + if err != nil { + merr := m.work.Get(wk).Mutate(func(ws *WorkState) error { + ws.Status = wsDone + ws.WorkError = err.Error() + return nil + }) + + if merr != nil { + return xerrors.Errorf("failed to start work and to track the error; merr: %+v, err: %w", merr, err) + } + return err + } + + err = m.work.Get(wk).Mutate(func(ws *WorkState) error { + _, ok := m.results[wk] + if ok { + log.Warn("work returned before we started tracking it") + ws.Status = wsDone + } else { + ws.Status = wsRunning + } + ws.WorkerCall = callID + return nil + }) + if err != nil { + return xerrors.Errorf("registering running work: %w", err) + } + + m.callToWork[callID] = wk + + return nil + } +} + +func (m *Manager) waitWork(ctx context.Context, wid workID) (interface{}, error) { + m.workLk.Lock() + + var ws WorkState + if err := m.work.Get(wid).Get(&ws); err != nil { + m.workLk.Unlock() + return nil, xerrors.Errorf("getting work status: %w", err) + } + + if ws.Status == wsStarted { + m.workLk.Unlock() + return nil, xerrors.Errorf("waitWork called for work in 'started' state") + } + + // sanity check + wk := m.callToWork[ws.WorkerCall] + if wk != wid { + m.workLk.Unlock() + return nil, xerrors.Errorf("wrong callToWork mapping for call %s; expected %s, got %s", ws.WorkerCall, wid, wk) + } + + // make sure we don't have the result ready + cr, ok := m.callRes[ws.WorkerCall] + if ok { + delete(m.callToWork, ws.WorkerCall) + + if len(cr) == 1 { + err := m.work.Get(wk).End() + if err != nil { + m.workLk.Unlock() + // Not great, but not worth discarding potentially multi-hour computation over this + log.Errorf("marking work as done: %+v", err) + } + + res := <- cr + delete(m.callRes, ws.WorkerCall) + + m.workLk.Unlock() + return res.r, res.err + } + + m.workLk.Unlock() + return nil, xerrors.Errorf("something else in waiting on callRes") + } + + ch, ok := m.waitRes[wid] + if !ok { + ch = make(chan struct{}) + m.waitRes[wid] = ch + } + m.workLk.Unlock() + + select { + case <-ch: + m.workLk.Lock() + defer m.workLk.Unlock() + + res := m.results[wid] + delete(m.results, wid) + + err := m.work.Get(wk).End() + if err != nil { + // Not great, but not worth discarding potentially multi-hour computation over this + log.Errorf("marking work as done: %+v", err) + } + + return res.r, res.err + case <-ctx.Done(): + return nil, xerrors.Errorf("waiting for work result: %w", ctx.Err()) + } +} + +func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interface{}, error) { + m.workLk.Lock() + _, ok := m.callToWork[callID] + if ok { + m.workLk.Unlock() + return nil, xerrors.Errorf("can't wait for calls related to work") + } + + ch, ok := m.callRes[callID] + if !ok { + ch = make(chan result) + m.callRes[callID] = ch + } + m.workLk.Unlock() + + defer func() { + m.workLk.Lock() + defer m.workLk.Unlock() + + delete(m.callRes, callID) + }() + + select { + case res := <-ch: + return res.r, res.err + case <-ctx.Done(): + return nil, xerrors.Errorf("waiting for call result: %w", ctx.Err()) + } +} + +func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr string) error { + var err error + if serr != "" { + err = errors.New(serr) + } + + res := result{ + r: r, + err: err, + } + + m.workLk.Lock() + defer m.workLk.Unlock() + + wid, ok := m.callToWork[callID] + if !ok { + rch, ok := m.callRes[callID] + if !ok { + rch = make(chan result, 1) + m.callRes[callID] = rch + } + + if len(rch) > 0 { + return xerrors.Errorf("callRes channel already has a response") + } + if cap(rch) == 0 { + return xerrors.Errorf("expected rch to be buffered") + } + + rch <- res + return nil + } + + _, ok = m.results[wid] + if ok { + return xerrors.Errorf("result for call %v already reported") + } + + m.results[wid] = res + + close(m.waitRes[wid]) + delete(m.waitRes, wid) + + return nil +} diff --git a/extern/sector-storage/calltracker.go b/extern/sector-storage/worker_calltracker.go similarity index 71% rename from extern/sector-storage/calltracker.go rename to extern/sector-storage/worker_calltracker.go index 8c5aff577..56909e68c 100644 --- a/extern/sector-storage/calltracker.go +++ b/extern/sector-storage/worker_calltracker.go @@ -6,7 +6,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) -type callTracker struct { +type workerCallTracker struct { st *statestore.StateStore // by CallID } @@ -25,13 +25,13 @@ type Call struct { Result []byte } -func (wt *callTracker) onStart(ci storiface.CallID) error { +func (wt *workerCallTracker) onStart(ci storiface.CallID) error { return wt.st.Begin(ci, &Call{ State: CallStarted, }) } -func (wt *callTracker) onDone(ci storiface.CallID, ret []byte) error { +func (wt *workerCallTracker) onDone(ci storiface.CallID, ret []byte) error { st := wt.st.Get(ci) return st.Mutate(func(cs *Call) error { cs.State = CallDone @@ -40,7 +40,7 @@ func (wt *callTracker) onDone(ci storiface.CallID, ret []byte) error { }) } -func (wt *callTracker) onReturned(ci storiface.CallID) error { +func (wt *workerCallTracker) onReturned(ci storiface.CallID) error { st := wt.st.Get(ci) return st.End() } diff --git a/extern/sector-storage/localworker.go b/extern/sector-storage/worker_local.go similarity index 99% rename from extern/sector-storage/localworker.go rename to extern/sector-storage/worker_local.go index 0a1a02397..67b9df5e1 100644 --- a/extern/sector-storage/localworker.go +++ b/extern/sector-storage/worker_local.go @@ -38,7 +38,7 @@ type LocalWorker struct { sindex stores.SectorIndex ret storiface.WorkerReturn - ct *callTracker + ct *workerCallTracker acceptTasks map[sealtasks.TaskType]struct{} } @@ -57,7 +57,7 @@ func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex: sindex, ret: ret, - ct: &callTracker{ + ct: &workerCallTracker{ st: cst, }, acceptTasks: acceptTasks, diff --git a/extern/sector-storage/work_tracker.go b/extern/sector-storage/worker_tracked.go similarity index 100% rename from extern/sector-storage/work_tracker.go rename to extern/sector-storage/worker_tracked.go diff --git a/gen/main.go b/gen/main.go index c7ae5bd57..95ace5583 100644 --- a/gen/main.go +++ b/gen/main.go @@ -77,6 +77,7 @@ func main() { err = gen.WriteMapEncodersToFile("./extern/sector-storage/cbor_gen.go", "sectorstorage", sectorstorage.Call{}, + sectorstorage.WorkState{}, ) if err != nil { fmt.Println(err)