diff --git a/node/config/def.go b/node/config/def.go index edb00aa5e..70fddf64a 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -56,7 +56,7 @@ type Storage struct { // Local worker config AllowPreCommit1 bool AllowPreCommit2 bool - AllowCommit bool + AllowCommit bool } func defCommon() Common { diff --git a/node/node_test.go b/node/node_test.go index e7884c377..ef11e5fff 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/rand" + "github.com/filecoin-project/lotus/lib/lotuslog" "io/ioutil" "net/http/httptest" "testing" @@ -454,6 +455,7 @@ func TestAPIDealFlowReal(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode") } + lotuslog.SetupLogLevels() logging.SetLogLevel("miner", "ERROR") logging.SetLogLevel("chainstore", "ERROR") logging.SetLogLevel("chain", "ERROR") diff --git a/storage/sectorstorage/manager.go b/storage/sectorstorage/manager.go index 3ef8e30c6..6f4bd58e2 100644 --- a/storage/sectorstorage/manager.go +++ b/storage/sectorstorage/manager.go @@ -119,9 +119,15 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg localTasks := []sealtasks.TaskType{ sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize, } - if sc.AllowPreCommit1 { localTasks = append(localTasks, sealtasks.TTPreCommit1)} - if sc.AllowPreCommit2 { localTasks = append(localTasks, sealtasks.TTPreCommit2)} - if sc.AllowCommit { localTasks = append(localTasks, sealtasks.TTCommit2)} + if sc.AllowPreCommit1 { + localTasks = append(localTasks, sealtasks.TTPreCommit1) + } + if sc.AllowPreCommit2 { + localTasks = append(localTasks, sealtasks.TTPreCommit2) + } + if sc.AllowCommit { + localTasks = append(localTasks, sealtasks.TTCommit2) + } err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{ SealProof: cfg.SealProofType, diff --git a/storage/sectorstorage/roprov.go b/storage/sectorstorage/roprov.go index 4b0dfbe2b..8355500b5 100644 --- a/storage/sectorstorage/roprov.go +++ b/storage/sectorstorage/roprov.go @@ -15,7 +15,7 @@ type readonlyProvider struct { } func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) { - if allocate != 0 { // 0 - don't allocate anything + if allocate != stores.FTNone { return sectorbuilder.SectorPaths{}, nil, xerrors.New("read-only storage") } diff --git a/storage/sectorstorage/stores/filetype.go b/storage/sectorstorage/stores/filetype.go new file mode 100644 index 000000000..e85db1e53 --- /dev/null +++ b/storage/sectorstorage/stores/filetype.go @@ -0,0 +1,8 @@ +package stores + +import "github.com/filecoin-project/go-sectorbuilder" + +const ( + // TODO: move the other types here after we drop go-sectorbuilder + FTNone sectorbuilder.SectorFileType = 0 +) diff --git a/storage/sectorstorage/stores/http_handler.go b/storage/sectorstorage/stores/http_handler.go index b242d1159..a49349901 100644 --- a/storage/sectorstorage/stores/http_handler.go +++ b/storage/sectorstorage/stores/http_handler.go @@ -71,7 +71,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ w.WriteHeader(500) return } - paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, ft, 0, false) + paths, _, done, err := handler.Local.AcquireSector(r.Context(), id, ft, FTNone, false) if err != nil { log.Error("%+v", err) w.WriteHeader(500) diff --git a/storage/sectorstorage/stores/interface.go b/storage/sectorstorage/stores/interface.go index e56a6d74a..45e371fb7 100644 --- a/storage/sectorstorage/stores/interface.go +++ b/storage/sectorstorage/stores/interface.go @@ -13,6 +13,10 @@ import ( type Store interface { AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (paths sectorbuilder.SectorPaths, stores sectorbuilder.SectorPaths, done func(), err error) Remove(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error + + // move sectors into storage + MoveStorage(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error + FsStat(ctx context.Context, id ID) (FsStat, error) } diff --git a/storage/sectorstorage/stores/local.go b/storage/sectorstorage/stores/local.go index d891f86fa..bc2e56a69 100644 --- a/storage/sectorstorage/stores/local.go +++ b/storage/sectorstorage/stores/local.go @@ -309,6 +309,63 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder return nil } +func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error { + dest, destIds, sdone, err := st.AcquireSector(ctx, s, FTNone, types, false) + if err != nil { + return xerrors.Errorf("acquire dest storage: %w", err) + } + defer sdone() + + src, srcIds, ddone, err := st.AcquireSector(ctx, s, types, FTNone, false) + if err != nil { + return xerrors.Errorf("acquire src storage: %w", err) + } + defer ddone() + + for _, fileType := range pathTypes { + if fileType&types == 0 { + continue + } + + sst, err := st.index.StorageInfo(ctx, ID(sectorutil.PathByType(srcIds, fileType))) + if err != nil { + return xerrors.Errorf("failed to get source storage info: %w", err) + } + + dst, err := st.index.StorageInfo(ctx, ID(sectorutil.PathByType(destIds, fileType))) + if err != nil { + return xerrors.Errorf("failed to get source storage info: %w", err) + } + + if sst.ID == dst.ID { + log.Debugf("not moving %v(%d); src and dest are the same", s, fileType) + continue + } + + if sst.CanStore { + log.Debugf("not moving %v(%d); source supports storage", s, fileType) + continue + } + + log.Debugf("moving %v(%d) to storage: %s(se:%t; st:%t) -> %s(se:%t; st:%t)", s, fileType, sst.ID, sst.CanSeal, sst.CanStore, dst.ID, dst.CanSeal, dst.CanStore) + + if err := st.index.StorageDropSector(ctx, ID(sectorutil.PathByType(srcIds, fileType)), s, fileType); err != nil { + return xerrors.Errorf("dropping source sector from index: %w", err) + } + + if err := move(sectorutil.PathByType(src, fileType), sectorutil.PathByType(dest, fileType)); err != nil { + // TODO: attempt some recovery (check if src is still there, re-declare) + return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err) + } + + if err := st.index.StorageDeclareSector(ctx, ID(sectorutil.PathByType(destIds, fileType)), s, fileType); err != nil { + return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(sectorutil.PathByType(destIds, fileType)), err) + } + } + + return nil +} + var errPathNotFound = xerrors.Errorf("fsstat: path not found") func (st *Local) FsStat(ctx context.Context, id ID) (FsStat, error) { diff --git a/storage/sectorstorage/stores/remote.go b/storage/sectorstorage/stores/remote.go index 66b818434..e44b8cfec 100644 --- a/storage/sectorstorage/stores/remote.go +++ b/storage/sectorstorage/stores/remote.go @@ -98,7 +98,7 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType return si[i].Weight < si[j].Weight }) - apaths, ids, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing) + apaths, ids, done, err := r.local.AcquireSector(ctx, s, FTNone, fileType, sealing) if err != nil { return "", "", "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err) } @@ -107,6 +107,8 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType var merr error for _, info := range si { + // TODO: see what we have local, prefer that + for _, url := range info.URLs { err := r.fetch(ctx, url, dest) if err != nil { @@ -174,6 +176,17 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error { } } +func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, types sectorbuilder.SectorFileType) error { + // Make sure we have the data local + _, _, ddone, err := r.AcquireSector(ctx, s, types, FTNone, false) + if err != nil { + return xerrors.Errorf("acquire src storage (remote): %w", err) + } + ddone() + + return r.local.MoveStorage(ctx, s, types) +} + func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ sectorbuilder.SectorFileType) error { if bits.OnesCount(uint(typ)) != 1 { return xerrors.New("delete expects one file type") diff --git a/storage/sectorstorage/stores/util_unix.go b/storage/sectorstorage/stores/util_unix.go new file mode 100644 index 000000000..eeb691ddf --- /dev/null +++ b/storage/sectorstorage/stores/util_unix.go @@ -0,0 +1,43 @@ +package stores + +import ( + "bytes" + "os/exec" + "path/filepath" + "strings" + + "github.com/mitchellh/go-homedir" + "golang.org/x/xerrors" +) + +func move(from, to string) error { + from, err := homedir.Expand(from) + if err != nil { + return xerrors.Errorf("move: expanding from: %w", err) + } + + to, err = homedir.Expand(to) + if err != nil { + return xerrors.Errorf("move: expanding to: %w", err) + } + + if filepath.Base(from) != filepath.Base(to) { + return xerrors.Errorf("move: base names must match ('%s' != '%s')", filepath.Base(from), filepath.Base(to)) + } + + log.Debugw("move sector data", "from", from, "to", to) + + toDir := filepath.Dir(to) + + // `mv` has decades of experience in moving files quickly; don't pretend we + // can do better + + var errOut bytes.Buffer + cmd := exec.Command("/usr/bin/env", "mv", "-t", toDir, from) + cmd.Stderr = &errOut + if err := cmd.Run(); err != nil { + return xerrors.Errorf("exec mv (stderr: %s): %w", strings.TrimSpace(errOut.String()), err) + } + + return nil +} diff --git a/storage/sectorstorage/worker_local.go b/storage/sectorstorage/worker_local.go index 175106bad..de3f19c89 100644 --- a/storage/sectorstorage/worker_local.go +++ b/storage/sectorstorage/worker_local.go @@ -160,6 +160,10 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) e return xerrors.Errorf("removing unsealed data: %w", err) } + if err := l.storage.MoveStorage(ctx, sector, sectorbuilder.FTSealed|sectorbuilder.FTCache); err != nil { + return xerrors.Errorf("moving sealed data to storage: %w", err) + } + return nil }