diff --git a/storage/sealing/states.go b/storage/sealing/states.go index a1babd17d..24f44fdf0 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -72,12 +72,12 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorID), ticket.Value, sector.pieceInfos()) if err != nil { - return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)}) + return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)}) } cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorID), pc1o) if err != nil { - return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)}) + return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit(2) failed: %w", err)}) } return ctx.Send(SectorSealed{ diff --git a/storage/sealmgr/advmgr/manager.go b/storage/sealmgr/advmgr/manager.go index 0b9ced284..03a1bc139 100644 --- a/storage/sealmgr/advmgr/manager.go +++ b/storage/sealmgr/advmgr/manager.go @@ -71,7 +71,7 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi workers: []Worker{ NewLocalWorker(WorkerConfig{ SealProof: cfg.SealProofType, - TaskTypes: []sealmgr.TaskType{sealmgr.TTAddPiece}, + TaskTypes: []sealmgr.TaskType{sealmgr.TTAddPiece, sealmgr.TTCommit1, sealmgr.TTFinalize}, }, stor, lstor, si), }, scfg: cfg, @@ -248,7 +248,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a return nil, xerrors.Errorf("finding path for sector sealing: %w", err) } - candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best) + candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTCommit1, best) if len(candidateWorkers) == 0 { return nil, xerrors.New("no suitable workers found") // TODO: wait? } @@ -281,8 +281,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error return xerrors.Errorf("finding sealed sector: %w", err) } - candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best) // find last worker with the sector + candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTCommit1, best) + // TODO: Remove sector from sealing stores // TODO: Move the sector to long-term storage return candidateWorkers[0].FinalizeSector(ctx, sector) } diff --git a/storage/sealmgr/stores/index.go b/storage/sealmgr/stores/index.go index cccf9134b..054bb9533 100644 --- a/storage/sealmgr/stores/index.go +++ b/storage/sealmgr/stores/index.go @@ -2,7 +2,6 @@ package stores import ( "context" - "math/bits" "net/url" gopath "path" "sort" @@ -139,17 +138,24 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se } func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType) ([]StorageInfo, error) { - if bits.OnesCount(uint(ft)) != 1 { - return nil, xerrors.Errorf("findSector only works for a single file type") - } - i.lk.RLock() defer i.lk.RUnlock() - storageIDs := i.sectors[Decl{s, ft}] - out := make([]StorageInfo, len(storageIDs)) + storageIDs := map[ID]uint64{} - for j, id := range storageIDs { + for _, pathType := range pathTypes { + if ft&pathType == 0 { + continue + } + + for _, id := range i.sectors[Decl{s, ft}] { + storageIDs[id]++ + } + } + + out := make([]StorageInfo, 0, len(storageIDs)) + + for id, n := range storageIDs { st, ok := i.stores[id] if !ok { log.Warnf("storage %s is not present in sector index (referenced by sector %v)", id, s) @@ -167,13 +173,13 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector urls[k] = rl.String() } - out[j] = StorageInfo{ + out = append(out, StorageInfo{ ID: id, URLs: urls, - Weight: st.info.Weight, + Weight: st.info.Weight * n, // storage with more sector types is better CanSeal: st.info.CanSeal, CanStore: st.info.CanStore, - } + }) } return out, nil diff --git a/storage/sealmgr/stores/remote.go b/storage/sealmgr/stores/remote.go index b2f7e5f21..ecb907cb1 100644 --- a/storage/sealmgr/stores/remote.go +++ b/storage/sealmgr/stores/remote.go @@ -47,7 +47,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing) if err != nil { - return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err + return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err) } for _, fileType := range pathTypes { diff --git a/storage/sealmgr/task.go b/storage/sealmgr/task.go index 10b804724..fd35a1204 100644 --- a/storage/sealmgr/task.go +++ b/storage/sealmgr/task.go @@ -5,6 +5,9 @@ type TaskType string const ( TTAddPiece TaskType = "seal/v0/addpiece" TTPreCommit1 TaskType = "seal/v0/precommit/1" - TTPreCommit2 TaskType = "seal/v0/precommit/2" // Commit1 is called here too + TTPreCommit2 TaskType = "seal/v0/precommit/2" + TTCommit1 TaskType = "seal/v0/commit/1" // NOTE: We use this to transfer the sector into miner-local storage for now; Don't use on workers! TTCommit2 TaskType = "seal/v0/commit/2" + + TTFinalize TaskType = "seal/v0/finalize" )