Merge pull request #8331 from filecoin-project/backport/v1.15.1/fix/snap-worker-cleanup
backport: v1.15.1: feat: sealing: Sector upgrade queue
This commit is contained in:
commit
5ea1502b02
@ -57,8 +57,8 @@ var (
|
|||||||
FullAPIVersion0 = newVer(1, 5, 0)
|
FullAPIVersion0 = newVer(1, 5, 0)
|
||||||
FullAPIVersion1 = newVer(2, 2, 0)
|
FullAPIVersion1 = newVer(2, 2, 0)
|
||||||
|
|
||||||
MinerAPIVersion0 = newVer(1, 4, 0)
|
MinerAPIVersion0 = newVer(1, 5, 0)
|
||||||
WorkerAPIVersion0 = newVer(1, 5, 0)
|
WorkerAPIVersion0 = newVer(1, 6, 0)
|
||||||
)
|
)
|
||||||
|
|
||||||
//nolint:varcheck,deadcode
|
//nolint:varcheck,deadcode
|
||||||
|
43
extern/sector-storage/manager.go
vendored
43
extern/sector-storage/manager.go
vendored
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"github.com/mitchellh/go-homedir"
|
"github.com/mitchellh/go-homedir"
|
||||||
|
"go.uber.org/multierr"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
@ -589,7 +590,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
|
|||||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fts := storiface.FTUnsealed
|
moveUnsealed := storiface.FTUnsealed
|
||||||
{
|
{
|
||||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -597,7 +598,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
|
if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
|
||||||
fts = storiface.FTNone
|
moveUnsealed = storiface.FTNone
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -616,10 +617,10 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, false)
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTUpdateCache, false)
|
||||||
|
|
||||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
|
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
|
||||||
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|fts, pathType, storiface.AcquireMove),
|
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove),
|
||||||
func(ctx context.Context, w Worker) error {
|
func(ctx context.Context, w Worker) error {
|
||||||
_, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed))
|
_, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed))
|
||||||
return err
|
return err
|
||||||
@ -628,22 +629,30 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathStorage)
|
move := func(types storiface.SectorFileType) error {
|
||||||
moveUnsealed := fts
|
fetchSel := newAllocSelector(m.index, types, storiface.PathStorage)
|
||||||
{
|
{
|
||||||
if len(keepUnsealed) == 0 {
|
if len(keepUnsealed) == 0 {
|
||||||
moveUnsealed = storiface.FTNone
|
moveUnsealed = storiface.FTNone
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
||||||
|
m.schedFetch(sector, types, storiface.PathStorage, storiface.AcquireMove),
|
||||||
|
func(ctx context.Context, w Worker) error {
|
||||||
|
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, types))
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("moving sector to storage: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
err = multierr.Append(move(storiface.FTUpdate|storiface.FTUpdateCache), move(storiface.FTCache))
|
||||||
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
|
err = multierr.Append(err, move(storiface.FTSealed)) // Sealed separate from cache just in case ReleaseSectorKey was already called
|
||||||
func(ctx context.Context, w Worker) error {
|
if moveUnsealed != storiface.FTNone {
|
||||||
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed))
|
err = multierr.Append(err, move(moveUnsealed))
|
||||||
return err
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("moving sector to storage: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
2
extern/sector-storage/stores/http_handler.go
vendored
2
extern/sector-storage/stores/http_handler.go
vendored
@ -172,7 +172,7 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := handler.Local.Remove(r.Context(), id, ft, false, []ID{ID(r.FormValue("keep"))}); err != nil {
|
if err := handler.Local.Remove(r.Context(), id, ft, false, ParseIDList(r.FormValue("keep"))); err != nil {
|
||||||
log.Errorf("%+v", err)
|
log.Errorf("%+v", err)
|
||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
return
|
return
|
||||||
|
22
extern/sector-storage/stores/index.go
vendored
22
extern/sector-storage/stores/index.go
vendored
@ -7,6 +7,7 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
gopath "path"
|
gopath "path"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -29,6 +30,27 @@ var SkippedHeartbeatThresh = HeartbeatInterval * 5
|
|||||||
// filesystem, local or networked / shared by multiple machines
|
// filesystem, local or networked / shared by multiple machines
|
||||||
type ID string
|
type ID string
|
||||||
|
|
||||||
|
const IDSep = "."
|
||||||
|
|
||||||
|
type IDList []ID
|
||||||
|
|
||||||
|
func (il IDList) String() string {
|
||||||
|
l := make([]string, len(il))
|
||||||
|
for i, id := range il {
|
||||||
|
l[i] = string(id)
|
||||||
|
}
|
||||||
|
return strings.Join(l, IDSep)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseIDList(s string) IDList {
|
||||||
|
strs := strings.Split(s, IDSep)
|
||||||
|
out := make([]ID, len(strs))
|
||||||
|
for i, str := range strs {
|
||||||
|
out[i] = ID(str)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
type Group = string
|
type Group = string
|
||||||
|
|
||||||
type StorageInfo struct {
|
type StorageInfo struct {
|
||||||
|
42
extern/sector-storage/stores/remote.go
vendored
42
extern/sector-storage/stores/remote.go
vendored
@ -44,12 +44,36 @@ type Remote struct {
|
|||||||
pfHandler PartialFileHandler
|
pfHandler PartialFileHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error {
|
func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, typ storiface.SectorFileType) error {
|
||||||
// TODO: do this on remotes too
|
if bits.OnesCount(uint(typ)) != 1 {
|
||||||
// (not that we really need to do that since it's always called by the
|
return xerrors.New("RemoveCopies expects one file type")
|
||||||
// worker which pulled the copy)
|
}
|
||||||
|
|
||||||
return r.local.RemoveCopies(ctx, s, types)
|
if err := r.local.RemoveCopies(ctx, s, typ); err != nil {
|
||||||
|
return xerrors.Errorf("removing local copies: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
si, err := r.index.StorageFindSector(ctx, s, typ, 0, false)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", s, typ, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var hasPrimary bool
|
||||||
|
var keep []ID
|
||||||
|
for _, info := range si {
|
||||||
|
if info.Primary {
|
||||||
|
hasPrimary = true
|
||||||
|
keep = append(keep, info.ID)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hasPrimary {
|
||||||
|
log.Warnf("remote RemoveCopies: no primary copies of sector %v (%s), not removing anything", s, typ)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Remove(ctx, s, typ, true, keep)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler PartialFileHandler) *Remote {
|
func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler PartialFileHandler) *Remote {
|
||||||
@ -156,7 +180,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
|
|||||||
|
|
||||||
if op == storiface.AcquireMove {
|
if op == storiface.AcquireMove {
|
||||||
id := ID(storageID)
|
id := ID(storageID)
|
||||||
if err := r.deleteFromRemote(ctx, url, &id); err != nil {
|
if err := r.deleteFromRemote(ctx, url, []ID{id}); err != nil {
|
||||||
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
|
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -355,7 +379,7 @@ storeLoop:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, url := range info.URLs {
|
for _, url := range info.URLs {
|
||||||
if err := r.deleteFromRemote(ctx, url, nil); err != nil {
|
if err := r.deleteFromRemote(ctx, url, keepIn); err != nil {
|
||||||
log.Warnf("remove %s: %+v", url, err)
|
log.Warnf("remove %s: %+v", url, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -366,9 +390,9 @@ storeLoop:
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Remote) deleteFromRemote(ctx context.Context, url string, keepIn *ID) error {
|
func (r *Remote) deleteFromRemote(ctx context.Context, url string, keepIn IDList) error {
|
||||||
if keepIn != nil {
|
if keepIn != nil {
|
||||||
url = url + "?keep=" + string(*keepIn)
|
url = url + "?keep=" + keepIn.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Delete %s", url)
|
log.Infof("Delete %s", url)
|
||||||
|
15
extern/sector-storage/worker_local.go
vendored
15
extern/sector-storage/worker_local.go
vendored
@ -516,7 +516,20 @@ func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
|
|||||||
|
|
||||||
func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
|
func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
|
||||||
return l.asyncCall(ctx, sector, MoveStorage, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
return l.asyncCall(ctx, sector, MoveStorage, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||||
return nil, l.storage.MoveStorage(ctx, sector, types)
|
if err := l.storage.MoveStorage(ctx, sector, types); err != nil {
|
||||||
|
return nil, xerrors.Errorf("move to storage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, fileType := range storiface.PathTypes {
|
||||||
|
if fileType&types == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := l.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
|
||||||
|
return nil, xerrors.Errorf("rm copies (t:%s, s:%v): %w", fileType, sector, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user