Merge pull request #1451 from filecoin-project/feat/secstor-move-sealed
sectorstorage: Move sealed sectors to storage in FinalizeSector
This commit is contained in:
commit
02e5148720
@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
|
"github.com/filecoin-project/lotus/lib/lotuslog"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"testing"
|
"testing"
|
||||||
@ -454,6 +455,7 @@ func TestAPIDealFlowReal(t *testing.T) {
|
|||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("skipping test in short mode")
|
t.Skip("skipping test in short mode")
|
||||||
}
|
}
|
||||||
|
lotuslog.SetupLogLevels()
|
||||||
logging.SetLogLevel("miner", "ERROR")
|
logging.SetLogLevel("miner", "ERROR")
|
||||||
logging.SetLogLevel("chainstore", "ERROR")
|
logging.SetLogLevel("chainstore", "ERROR")
|
||||||
logging.SetLogLevel("chain", "ERROR")
|
logging.SetLogLevel("chain", "ERROR")
|
||||||
|
@ -119,9 +119,15 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
|
|||||||
localTasks := []sealtasks.TaskType{
|
localTasks := []sealtasks.TaskType{
|
||||||
sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize,
|
sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize,
|
||||||
}
|
}
|
||||||
if sc.AllowPreCommit1 { localTasks = append(localTasks, sealtasks.TTPreCommit1)}
|
if sc.AllowPreCommit1 {
|
||||||
if sc.AllowPreCommit2 { localTasks = append(localTasks, sealtasks.TTPreCommit2)}
|
localTasks = append(localTasks, sealtasks.TTPreCommit1)
|
||||||
if sc.AllowCommit { localTasks = append(localTasks, sealtasks.TTCommit2)}
|
}
|
||||||
|
if sc.AllowPreCommit2 {
|
||||||
|
localTasks = append(localTasks, sealtasks.TTPreCommit2)
|
||||||
|
}
|
||||||
|
if sc.AllowCommit {
|
||||||
|
localTasks = append(localTasks, sealtasks.TTCommit2)
|
||||||
|
}
|
||||||
|
|
||||||
err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
|
err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
|
||||||
SealProof: cfg.SealProofType,
|
SealProof: cfg.SealProofType,
|
||||||
|
@ -15,7 +15,7 @@ type readonlyProvider struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
|
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
|
||||||
if allocate != 0 { // 0 - don't allocate anything
|
if allocate != stores.FTNone {
|
||||||
return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage")
|
return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
8
storage/sectorstorage/stores/filetype.go
Normal file
8
storage/sectorstorage/stores/filetype.go
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
package stores
|
||||||
|
|
||||||
|
import "github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
|
||||||
|
const (
|
||||||
|
// TODO: move the other types here after we drop go-sectorbuilder
|
||||||
|
FTNone sectorbuilder.SectorFileType = 0
|
||||||
|
)
|
@ -71,7 +71,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
|
|||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, ft, 0, false)
|
paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, ft, FTNone, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("%+v", err)
|
log.Error("%+v", err)
|
||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
|
@ -13,6 +13,10 @@ import (
|
|||||||
type Store interface {
|
type Store interface {
|
||||||
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error)
|
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error)
|
||||||
Remove(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error
|
Remove(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error
|
||||||
|
|
||||||
|
// move sectors into storage
|
||||||
|
MoveStorage(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error
|
||||||
|
|
||||||
FsStat(ctx context.Context, id ID) (FsStat, error)
|
FsStat(ctx context.Context, id ID) (FsStat, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -309,6 +309,63 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error {
|
||||||
|
dest, destIds, sdone, err := st.AcquireSector(ctx, s, FTNone, types, false)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("acquire dest storage: %w", err)
|
||||||
|
}
|
||||||
|
defer sdone()
|
||||||
|
|
||||||
|
src, srcIds, ddone, err := st.AcquireSector(ctx, s, types, FTNone, false)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("acquire src storage: %w", err)
|
||||||
|
}
|
||||||
|
defer ddone()
|
||||||
|
|
||||||
|
for _, fileType := range pathTypes {
|
||||||
|
if fileType&types == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
sst, err := st.index.StorageInfo(ctx, ID(sectorutil.PathByType(srcIds, fileType)))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to get source storage info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dst, err := st.index.StorageInfo(ctx, ID(sectorutil.PathByType(destIds, fileType)))
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("failed to get source storage info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if sst.ID == dst.ID {
|
||||||
|
log.Debugf("not moving %v(%d); src and dest are the same", s, fileType)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if sst.CanStore {
|
||||||
|
log.Debugf("not moving %v(%d); source supports storage", s, fileType)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("moving %v(%d) to storage: %s(se:%t; st:%t) -> %s(se:%t; st:%t)", s, fileType, sst.ID, sst.CanSeal, sst.CanStore, dst.ID, dst.CanSeal, dst.CanStore)
|
||||||
|
|
||||||
|
if err := st.index.StorageDropSector(ctx, ID(sectorutil.PathByType(srcIds, fileType)), s, fileType); err != nil {
|
||||||
|
return xerrors.Errorf("dropping source sector from index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := move(sectorutil.PathByType(src, fileType), sectorutil.PathByType(dest, fileType)); err != nil {
|
||||||
|
// TODO: attempt some recovery (check if src is still there, re-declare)
|
||||||
|
return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := st.index.StorageDeclareSector(ctx, ID(sectorutil.PathByType(destIds, fileType)), s, fileType); err != nil {
|
||||||
|
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(sectorutil.PathByType(destIds, fileType)), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var errPathNotFound = xerrors.Errorf("fsstat: path not found")
|
var errPathNotFound = xerrors.Errorf("fsstat: path not found")
|
||||||
|
|
||||||
func (st *Local) FsStat(ctx context.Context, id ID) (FsStat, error) {
|
func (st *Local) FsStat(ctx context.Context, id ID) (FsStat, error) {
|
||||||
|
@ -98,7 +98,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
|||||||
return si[i].Weight < si[j].Weight
|
return si[i].Weight < si[j].Weight
|
||||||
})
|
})
|
||||||
|
|
||||||
apaths, ids, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing)
|
apaths, ids, done, err := r.local.AcquireSector(ctx, s, FTNone, fileType, sealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
||||||
}
|
}
|
||||||
@ -107,6 +107,8 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
|||||||
|
|
||||||
var merr error
|
var merr error
|
||||||
for _, info := range si {
|
for _, info := range si {
|
||||||
|
// TODO: see what we have local, prefer that
|
||||||
|
|
||||||
for _, url := range info.URLs {
|
for _, url := range info.URLs {
|
||||||
err := r.fetch(ctx, url, dest)
|
err := r.fetch(ctx, url, dest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -174,6 +176,17 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error {
|
||||||
|
// Make sure we have the data local
|
||||||
|
_, _, ddone, err := r.AcquireSector(ctx, s, types, FTNone, false)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("acquire src storage (remote): %w", err)
|
||||||
|
}
|
||||||
|
ddone()
|
||||||
|
|
||||||
|
return r.local.MoveStorage(ctx, s, types)
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error {
|
func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error {
|
||||||
if bits.OnesCount(uint(typ)) != 1 {
|
if bits.OnesCount(uint(typ)) != 1 {
|
||||||
return xerrors.New("delete expects one file type")
|
return xerrors.New("delete expects one file type")
|
||||||
|
43
storage/sectorstorage/stores/util_unix.go
Normal file
43
storage/sectorstorage/stores/util_unix.go
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
package stores
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/mitchellh/go-homedir"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func move(from, to string) error {
|
||||||
|
from, err := homedir.Expand(from)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("move: expanding from: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
to, err = homedir.Expand(to)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("move: expanding to: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if filepath.Base(from) != filepath.Base(to) {
|
||||||
|
return xerrors.Errorf("move: base names must match ('%s' != '%s')", filepath.Base(from), filepath.Base(to))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugw("move sector data", "from", from, "to", to)
|
||||||
|
|
||||||
|
toDir := filepath.Dir(to)
|
||||||
|
|
||||||
|
// `mv` has decades of experience in moving files quickly; don't pretend we
|
||||||
|
// can do better
|
||||||
|
|
||||||
|
var errOut bytes.Buffer
|
||||||
|
cmd := exec.Command("/usr/bin/env", "mv", "-t", toDir, from)
|
||||||
|
cmd.Stderr = &errOut
|
||||||
|
if err := cmd.Run(); err != nil {
|
||||||
|
return xerrors.Errorf("exec mv (stderr: %s): %w", strings.TrimSpace(errOut.String()), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -160,6 +160,10 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e
|
|||||||
return xerrors.Errorf("removing unsealed data: %w", err)
|
return xerrors.Errorf("removing unsealed data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := l.storage.MoveStorage(ctx, sector, sectorbuilder.FTSealed|sectorbuilder.FTCache); err != nil {
|
||||||
|
return xerrors.Errorf("moving sealed data to storage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user