workers: Transfer logic refactoring

This commit is contained in:
Łukasz Magiera 2020-03-11 06:49:17 +01:00
parent eb61a36fd7
commit 7e2e30f94b
12 changed files with 185 additions and 277 deletions

View File

@ -134,6 +134,16 @@ type StorageInfo struct {
CanStore bool
}
type StoragePath struct {
ID string
Weight uint64
LocalPath string
CanSeal bool
CanStore bool
}
type SealRes struct {
Err string
GoErr error `json:"-"`

View File

@ -14,6 +14,7 @@ type WorkerApi interface {
// TODO: Info() (name, ...) ?
TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error) // TaskType -> Weight
Paths(context.Context) ([]StoragePath, error)
storage.Sealer
}

View File

@ -199,6 +199,7 @@ type WorkerStruct struct {
Version func(context.Context) (build.Version, error) `perm:"admin"`
TaskTypes func(context.Context) (map[sealmgr.TaskType]struct{}, error) `perm:"admin"`
Paths func(context.Context) ([]api.StoragePath, error) `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"`
SealPreCommit2 func(context.Context, abi.SectorNumber, storage.PreCommit1Out) (sealedCID cid.Cid, unsealedCID cid.Cid, err error) `perm:"admin"`
@ -698,6 +699,10 @@ func (w *WorkerStruct) TaskTypes(ctx context.Context) (map[sealmgr.TaskType]stru
return w.Internal.TaskTypes(ctx)
}
func (w *WorkerStruct) Paths(ctx context.Context) ([]api.StoragePath, error) {
return w.Internal.Paths(ctx)
}
func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
return w.Internal.SealPreCommit1(ctx, sectorNum, ticket, pieces)
}

View File

@ -3,13 +3,14 @@ package main
import (
"context"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
)
type worker struct {
spt abi.RegisteredProof
}
func (w *worker) SealPreCommit1(ctx context.Context, sectorNum abi.SectorNumber, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {

View File

@ -2,6 +2,8 @@ package main
import (
"context"
"net/http"
"sort"
"github.com/filecoin-project/specs-actors/actors/abi"
@ -11,12 +13,32 @@ import (
type workerStorage struct {
path string // TODO: multi-path support
mid abi.ActorID
auth http.Header
api api.StorageMiner
}
func (w *workerStorage) AcquireSector(ctx context.Context, id abi.SectorNumber, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
w.api.WorkerFindSector()
asid := abi.SectorID{
Miner: w.mid,
Number: id,
}
// extract local storage; prefer
si, err := w.api.WorkerFindSector(ctx, asid, existing)
if err != nil {
return sectorbuilder.SectorPaths{}, nil, err
}
sort.Slice(si, func(i, j int) bool {
return si[i].Cost < si[j].Cost
})
best := si[0].URLs // TODO: not necessarily true
w.fetch(best, )
}
var _ sectorbuilder.SectorProvider = &workerStorage{}

View File

@ -1,37 +1,19 @@
package main
/*
import (
"fmt"
"io"
"mime"
"net/http"
"os"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/go-sectorbuilder/fs"
"github.com/filecoin-project/specs-actors/actors/abi"
files "github.com/ipfs/go-ipfs-files"
"golang.org/x/xerrors"
"gopkg.in/cheggaaa/pb.v1"
"path/filepath"
"github.com/filecoin-project/lotus/lib/tarutil"
)
func (w *worker) sizeForType(typ string) int64 {
size := int64(w.sb.SectorSize())
if typ == "cache" {
size *= 10
}
return size
}
func (w *worker) fetch(typ string, sectorID abi.SectorNumber) error {
outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
url := w.minerEndpoint + "/remote/" + typ + "/" + fmt.Sprint(sectorID)
log.Infof("Fetch %s %s", typ, url)
func (w *workerStorage) fetch(url, outname string) error {
log.Infof("Fetch %s", url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
@ -49,7 +31,7 @@ func (w *worker) fetch(typ string, sectorID abi.SectorNumber) error {
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}
bar := pb.New64(w.sizeForType(typ))
/*bar := pb.New64(w.sizeForType(typ))
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
@ -57,7 +39,7 @@ func (w *worker) fetch(typ string, sectorID abi.SectorNumber) error {
barreader := bar.NewProxyReader(resp.Body)
bar.Start()
defer bar.Finish()
defer bar.Finish()*/
mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
@ -70,109 +52,11 @@ func (w *worker) fetch(typ string, sectorID abi.SectorNumber) error {
switch mediatype {
case "application/x-tar":
return tarutil.ExtractTar(barreader, outname)
return tarutil.ExtractTar(resp.Body, outname)
case "application/octet-stream":
return files.WriteTo(files.NewReaderFile(barreader), outname)
return files.WriteTo(files.NewReaderFile(resp.Body), outname)
default:
return xerrors.Errorf("unknown content type: '%s'", mediatype)
}
}
func (w *worker) push(typ string, sectorID abi.SectorNumber) error {
w.limiter.transferLimit <- struct{}{}
defer func() {
<-w.limiter.transferLimit
}()
filename, err := w.sb.SectorPath(fs.DataType(typ), sectorID)
if err != nil {
return err
}
url := w.minerEndpoint + "/remote/" + typ + "/" + fmt.Sprint(sectorID)
log.Infof("Push %s %s", typ, url)
stat, err := os.Stat(string(filename))
if err != nil {
return err
}
var r io.Reader
if stat.IsDir() {
r, err = tarutil.TarDirectory(string(filename))
} else {
r, err = os.OpenFile(string(filename), os.O_RDONLY, 0644)
}
if err != nil {
return xerrors.Errorf("opening push reader: %w", err)
}
bar := pb.New64(w.sizeForType(typ))
bar.ShowPercent = true
bar.ShowSpeed = true
bar.ShowCounters = true
bar.Units = pb.U_BYTES
bar.Start()
defer bar.Finish()
//todo set content size
header := w.auth
if stat.IsDir() {
header.Set("Content-Type", "application/x-tar")
} else {
header.Set("Content-Type", "application/octet-stream")
}
req, err := http.NewRequest("PUT", url, bar.NewProxyReader(r))
if err != nil {
return err
}
req.Header = header
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return xerrors.Errorf("non-200 response: %d", resp.StatusCode)
}
if err := resp.Body.Close(); err != nil {
return err
}
// TODO: keep files around for later stages of sealing
return w.remove(typ, sectorID)
}
func (w *worker) remove(typ string, sectorID abi.SectorNumber) error {
filename := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
return os.RemoveAll(filename)
}
func (w *worker) fetchSector(sectorID abi.SectorNumber, typ sectorbuilder.WorkerTaskType) error {
w.limiter.transferLimit <- struct{}{}
defer func() {
<-w.limiter.transferLimit
}()
var err error
switch typ {
case sectorbuilder.WorkerPreCommit:
err = w.fetch("staging", sectorID)
case sectorbuilder.WorkerCommit:
err = w.fetch("sealed", sectorID)
if err != nil {
return xerrors.Errorf("fetch sealed: %w", err)
}
err = w.fetch("cache", sectorID)
}
if err != nil {
return xerrors.Errorf("fetch failed: %w", err)
}
return nil
}
*/

View File

@ -7,7 +7,6 @@ import (
"os"
"strconv"
"github.com/gorilla/mux"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -48,122 +47,7 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
return
}
mux := mux.NewRouter()
mux.HandleFunc("/remote/{type}/{id}", sm.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{id}", sm.remotePutSector).Methods("PUT")
log.Infof("SERVEGETREMOTE %s", r.URL)
mux.ServeHTTP(w, r)
}
func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) {
panic("todo")
/* vars := mux.Vars(r)
id, err := strconv.ParseUint(vars["id"], 10, 64)
if err != nil {
log.Error("parsing sector id: ", err)
w.WriteHeader(500)
return
}
path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), abi.SectorNumber(id))
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
stat, err := os.Stat(string(path))
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
var rd io.Reader
if stat.IsDir() {
rd, err = tarutil.TarDirectory(string(path))
w.Header().Set("Content-Type", "application/x-tar")
} else {
rd, err = os.OpenFile(string(path), os.O_RDONLY, 0644)
w.Header().Set("Content-Type", "application/octet-stream")
}
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
w.WriteHeader(200)
if _, err := io.Copy(w, rd); err != nil {
log.Error(err)
return
}*/
}
func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) {
panic("todo")
/* vars := mux.Vars(r)
id, err := strconv.ParseUint(vars["id"], 10, 64)
if err != nil {
log.Error("parsing sector id: ", err)
w.WriteHeader(500)
return
}
// This is going to get better with worker-to-worker transfers
path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), abi.SectorNumber(id))
if err != nil {
if err != fs.ErrNotFound {
log.Error(err)
w.WriteHeader(500)
return
}
path, err = sm.SectorBuilder.AllocSectorPath(fs.DataType(vars["type"]), abi.SectorNumber(id), true)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
}
mediatype, _, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
if err := os.RemoveAll(string(path)); err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
switch mediatype {
case "application/x-tar":
if err := tarutil.ExtractTar(r.Body, string(path)); err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
default:
if err := files.WriteTo(files.NewReaderFile(r.Body), string(path)); err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
}
w.WriteHeader(200)
log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], r.ContentLength)*/
sm.StorageMgr.ServeHTTP(w, r)
}
/*

View File

@ -4,13 +4,15 @@ import (
"context"
"io"
storage2 "github.com/filecoin-project/specs-storage/storage"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealmgr"
)
@ -100,8 +102,8 @@ func (l *localWorker) TaskTypes(context.Context) (map[sealmgr.TaskType]struct{},
}, nil
}
func (l *localWorker) Paths() []Path {
return l.storage.local()
func (l *localWorker) Paths(context.Context) ([]api.StoragePath, error) {
return l.storage.local(), nil
}
var _ Worker = &localWorker{}

View File

@ -3,6 +3,7 @@ package advmgr
import (
"context"
"io"
"net/http"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
@ -11,6 +12,8 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/specs-actors/actors/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
@ -24,26 +27,11 @@ type SectorIDCounter interface {
Next() (abi.SectorNumber, error)
}
type LocalStorage interface {
GetStorage() (config.StorageConfig, error)
SetStorage(func(*config.StorageConfig)) error
}
type Path struct {
ID string
Weight uint64
LocalPath string
CanSeal bool
CanStore bool
}
type Worker interface {
sectorbuilder.Sealer
TaskTypes(context.Context) (map[sealmgr.TaskType]struct{}, error)
Paths() []Path
Paths(context.Context) ([]api.StoragePath, error)
}
type Manager struct {
@ -107,6 +95,10 @@ func (m *Manager) AddLocalStorage(path string) error {
return nil
}
func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.storage.ServeHTTP(w, r)
}
func (m *Manager) SectorSize() abi.SectorSize {
sz, _ := m.scfg.SealProofType.SectorSize()
return sz
@ -134,16 +126,22 @@ func (m *Manager) getWorkersByPaths(task sealmgr.TaskType, inPaths []config.Stor
continue
}
phs, err := worker.Paths(context.TODO())
if err != nil {
log.Errorf("error getting worker paths: %+v", err)
continue
}
// check if the worker has access to the path we selected
var st *config.StorageMeta
for _, p := range worker.Paths() {
for _, m := range inPaths {
if p.ID == m.ID {
for _, p := range phs {
for _, meta := range inPaths {
if p.ID == meta.ID {
if st != nil && st.Weight > p.Weight {
continue
}
p := m // copy
p := meta // copy
st = &p
}
}

View File

@ -9,7 +9,6 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/api/client"
)
@ -18,15 +17,11 @@ type remote struct {
}
func (r *remote) AddPiece(ctx context.Context, sector abi.SectorNumber, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) {
panic("implement me")
}
func (r *remote) Paths() []Path {
panic("implement me")
return abi.PieceInfo{},xerrors.New("unsupported")
}
func ConnectRemote(ctx context.Context, fa api.FullNode, url string) (*remote, error) {
token, err := fa.AuthNew(ctx, []api.Permission{apistruct.PermAdmin})
token, err := fa.AuthNew(ctx, []api.Permission{"admin"})
if err != nil {
return nil, xerrors.Errorf("creating auth token for remote connection: %w", err)
}

View File

@ -3,14 +3,20 @@ package advmgr
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"sync"
"github.com/gorilla/mux"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/node/config"
@ -129,6 +135,9 @@ func (st *storage) acquireSector(mid abi.ActorID, id abi.SectorNumber, existing
if s&fileType == 0 {
continue
}
if p.local == "" {
continue // TODO: fetch
}
spath := filepath.Join(p.local, fileType.String(), fmt.Sprintf("s-t0%d-%d", mid, id))
@ -239,14 +248,14 @@ func (st *storage) findSector(mid abi.ActorID, sn abi.SectorNumber, typ sectorbu
return out, nil
}
func (st *storage) local() []Path {
var out []Path
func (st *storage) local() []api.StoragePath {
var out []api.StoragePath
for _, p := range st.paths {
if p.local == "" {
continue
}
out = append(out, Path{
out = append(out, api.StoragePath{
ID: p.meta.ID,
Weight: p.meta.Weight,
LocalPath: p.local,
@ -258,6 +267,93 @@ func (st *storage) local() []Path {
return out
}
func (st *storage) ServeHTTP(w http.ResponseWriter, r *http.Request) { // /storage/
mux := mux.NewRouter()
mux.HandleFunc("/{type}/{id}", st.remoteGetSector).Methods("GET")
log.Infof("SERVEGETREMOTE %s", r.URL)
mux.ServeHTTP(w, r)
}
func (st *storage) remoteGetSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, err := parseSectorID(vars["id"])
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
ft, err := ftFromString(vars["type"])
if err != nil {
return
}
paths, done, err := st.acquireSector(id.Miner, id.Number, ft, 0, false)
if err != nil {
return
}
defer done()
var path string
switch ft {
case sectorbuilder.FTUnsealed:
path = paths.Unsealed
case sectorbuilder.FTSealed:
path = paths.Sealed
case sectorbuilder.FTCache:
path = paths.Cache
}
if path == "" {
log.Error("acquired path was empty")
w.WriteHeader(500)
return
}
stat, err := os.Stat(path)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
var rd io.Reader
if stat.IsDir() {
rd, err = tarutil.TarDirectory(path)
w.Header().Set("Content-Type", "application/x-tar")
} else {
rd, err = os.OpenFile(path, os.O_RDONLY, 0644)
w.Header().Set("Content-Type", "application/octet-stream")
}
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
w.WriteHeader(200)
if _, err := io.Copy(w, rd); err != nil { // TODO: default 32k buf may be too small
log.Error(err)
return
}
}
func ftFromString(t string) (sectorbuilder.SectorFileType, error) {
switch t {
case sectorbuilder.FTUnsealed.String():
return sectorbuilder.FTUnsealed, nil
case sectorbuilder.FTSealed.String():
return sectorbuilder.FTSealed, nil
case sectorbuilder.FTCache.String():
return sectorbuilder.FTCache, nil
default:
return 0, xerrors.Errorf("unknown sector file type: '%s'", t)
}
}
func parseSectorID(baseName string) (abi.SectorID, error) {
var n abi.SectorNumber
var mid abi.ActorID

View File

@ -0,0 +1,10 @@
package advmgr
import (
"github.com/filecoin-project/lotus/node/config"
)
type LocalStorage interface {
GetStorage() (config.StorageConfig, error)
SetStorage(func(*config.StorageConfig)) error
}