workers: get sectors back to miner process after precommit

This commit is contained in:
Łukasz Magiera 2020-03-19 20:17:43 +01:00
parent 80cca91584
commit 314fba049f
5 changed files with 28 additions and 18 deletions

View File

@ -72,12 +72,12 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorID), ticket.Value, sector.pieceInfos())
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
}
cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorID), pc1o)
if err != nil {
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit(2) failed: %w", err)})
}
return ctx.Send(SectorSealed{

View File

@ -71,7 +71,7 @@ func New(ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Confi
workers: []Worker{
NewLocalWorker(WorkerConfig{
SealProof: cfg.SealProofType,
TaskTypes: []sealmgr.TaskType{sealmgr.TTAddPiece},
TaskTypes: []sealmgr.TaskType{sealmgr.TTAddPiece, sealmgr.TTCommit1, sealmgr.TTFinalize},
}, stor, lstor, si),
},
scfg: cfg,
@ -248,7 +248,7 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best)
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTCommit1, best)
if len(candidateWorkers) == 0 {
return nil, xerrors.New("no suitable workers found") // TODO: wait?
}
@ -281,8 +281,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error
return xerrors.Errorf("finding sealed sector: %w", err)
}
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTPreCommit2, best) // find last worker with the sector
candidateWorkers, _ := m.getWorkersByPaths(sealmgr.TTCommit1, best)
// TODO: Remove sector from sealing stores
// TODO: Move the sector to long-term storage
return candidateWorkers[0].FinalizeSector(ctx, sector)
}

View File

@ -2,7 +2,6 @@ package stores
import (
"context"
"math/bits"
"net/url"
gopath "path"
"sort"
@ -139,17 +138,24 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se
}
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sectorbuilder.SectorFileType) ([]StorageInfo, error) {
if bits.OnesCount(uint(ft)) != 1 {
return nil, xerrors.Errorf("findSector only works for a single file type")
}
i.lk.RLock()
defer i.lk.RUnlock()
storageIDs := i.sectors[Decl{s, ft}]
out := make([]StorageInfo, len(storageIDs))
storageIDs := map[ID]uint64{}
for j, id := range storageIDs {
for _, pathType := range pathTypes {
if ft&pathType == 0 {
continue
}
for _, id := range i.sectors[Decl{s, ft}] {
storageIDs[id]++
}
}
out := make([]StorageInfo, 0, len(storageIDs))
for id, n := range storageIDs {
st, ok := i.stores[id]
if !ok {
log.Warnf("storage %s is not present in sector index (referenced by sector %v)", id, s)
@ -167,13 +173,13 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector
urls[k] = rl.String()
}
out[j] = StorageInfo{
out = append(out, StorageInfo{
ID: id,
URLs: urls,
Weight: st.info.Weight,
Weight: st.info.Weight * n, // storage with more sector types is better
CanSeal: st.info.CanSeal,
CanStore: st.info.CanStore,
}
})
}
return out, nil

View File

@ -47,7 +47,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sec
paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing)
if err != nil {
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, err
return sectorbuilder.SectorPaths{}, sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("local acquire error: %w", err)
}
for _, fileType := range pathTypes {

View File

@ -5,6 +5,9 @@ type TaskType string
const (
TTAddPiece TaskType = "seal/v0/addpiece"
TTPreCommit1 TaskType = "seal/v0/precommit/1"
TTPreCommit2 TaskType = "seal/v0/precommit/2" // Commit1 is called here too
TTPreCommit2 TaskType = "seal/v0/precommit/2"
TTCommit1 TaskType = "seal/v0/commit/1" // NOTE: We use this to transfer the sector into miner-local storage for now; Don't use on workers!
TTCommit2 TaskType = "seal/v0/commit/2"
TTFinalize TaskType = "seal/v0/finalize"
)