workers: Basic Remote store

This commit is contained in:
Łukasz Magiera 2020-03-13 01:23:05 +01:00
parent 56968d858c
commit 86871e5abc
11 changed files with 201 additions and 190 deletions

View File

@ -4,7 +4,6 @@ import (
"os"
logging "github.com/ipfs/go-log/v2"
manet "github.com/multiformats/go-multiaddr-net"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
@ -118,47 +117,7 @@ var runCmd = &cli.Command{
return xerrors.Errorf("get params: %w", err)
}
ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get api info: %w", err)
}
_, storageAddr, err := manet.DialArgs(ainfo.Addr)
/*
/*ppt, spt, err := api.ProofTypeFromSectorSize(ssize)
if err != nil {
return err
}
/*sb, err := sectorbuilder.NewStandalone(&sectorbuilder.Config{
SealProofType: spt,
PoStProofType: ppt,
Miner: act,
WorkerThreads: workers,
Paths: sectorbuilder.SimplePath(r),
})
if err != nil {
return err
}
nQueues := workers + transfers
var wg sync.WaitGroup
wg.Add(nQueues)
for i := 0; i < nQueues; i++ {
go func() {
defer wg.Done()
/* if err := acceptJobs(ctx, nodeApi, sb, limiter, "http://"+storageAddr, ainfo.AuthHeader(), r, cctx.Bool("no-precommit"), cctx.Bool("no-commit")); err != nil {
log.Warnf("%+v", err)
return
}
}()
}
wg.Wait()*/
return nil
},
}

View File

@ -1,13 +1,12 @@
package main
import (
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/go-sectorbuilder"
)
type worker struct { // TODO: use advmgr.LocalWorker here
sectorbuilder.Basic
*advmgr.LocalWorker
}
var _ storage.Sealer = &worker{}

View File

@ -1,53 +0,0 @@
package main
import (
"context"
"net/http"
"sort"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil"
)
type workerStorage struct {
path string // TODO: multi-path support
mid abi.ActorID // ewwhh TODO: passthru in sectobuilder/ffi
local *stores.Local
auth http.Header
api api.StorageMiner
}
func (w *workerStorage) AcquireSector(ctx context.Context, id abi.SectorNumber, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
asid := abi.SectorID{
Miner: w.mid,
Number: id,
}
// extract local storage; prefer
si, err := w.api.WorkerFindSector(ctx, asid, existing)
if err != nil {
return sectorbuilder.SectorPaths{}, nil, err
}
sort.Slice(si, func(i, j int) bool {
return si[i].Cost < si[j].Cost
})
best := si[0].URLs // TODO: not necessarily true
sname := sectorutil.SectorName(abi.SectorID{
Miner: w.mid,
Number: id,
})
w.fetch(best, )
}
var _ sectorbuilder.SectorProvider = &workerStorage{}

View File

@ -1,62 +0,0 @@
package main
import (
"mime"
"net/http"
"os"
files "github.com/ipfs/go-ipfs-files"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/lib/tarutil"
)
func (w *workerStorage) fetch(url, outname string) error {
log.Infof("Fetch %s", url)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return xerrors.Errorf("request: %w", err)
}
req.Header = w.auth
resp, err := http.DefaultClient.Do(req)
if err != nil {
return xerrors.Errorf("do request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}
/*bar := pb.New64(w.sizeForType(typ))
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
barreader := bar.NewProxyReader(resp.Body)
bar.Start()
defer bar.Finish()*/
mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
return xerrors.Errorf("parse media type: %w", err)
}
if err := os.RemoveAll(outname); err != nil {
return xerrors.Errorf("removing dest: %w", err)
}
switch mediatype {
case "application/x-tar":
return tarutil.ExtractTar(resp.Body, outname)
case "application/octet-stream":
return files.WriteTo(files.NewReaderFile(resp.Body), outname)
default:
return xerrors.Errorf("unknown content type: '%s'", mediatype)
}
}

2
go.mod
View File

@ -117,5 +117,3 @@ replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v
replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0
replace github.com/filecoin-project/go-sectorbuilder => /home/magik6k/gohack/github.com/filecoin-project/go-sectorbuilder

2
go.sum
View File

@ -96,6 +96,7 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.8.0 h1:5bzFgL+oy7JITMTxUPJ00n7VxmYd/PdMp5mHFX40/RY=
github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGjnw8=
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
@ -121,6 +122,7 @@ github.com/filecoin-project/go-fil-markets v0.0.0-20200304003055-d449a980d4bd h1
github.com/filecoin-project/go-fil-markets v0.0.0-20200304003055-d449a980d4bd/go.mod h1:rfRwhd3ujcCXnD4N9oEM2wjh8GRZGoeNXME+UPG/9ts=
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 h1:eYxi6vI5CyeXD15X1bB3bledDXbqKxqf0wQzTLgwYwA=
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200226210935-4739f8749f56/go.mod h1:tzTc9BxxSbjlIzhFwm5h9oBkXKkRuLxeiWspntwnKyw=

View File

@ -2,6 +2,7 @@ package sectorutil
import (
"fmt"
"github.com/filecoin-project/go-sectorbuilder"
"golang.org/x/xerrors"
@ -29,3 +30,27 @@ func ParseSectorID(baseName string) (abi.SectorID, error) {
func SectorName(sid abi.SectorID) string {
return fmt.Sprintf("s-t0%d-%d", sid.Miner, sid.Number)
}
func PathByType(sps sectorbuilder.SectorPaths, fileType sectorbuilder.SectorFileType) string {
switch fileType {
case sectorbuilder.FTUnsealed:
return sps.Unsealed
case sectorbuilder.FTSealed:
return sps.Sealed
case sectorbuilder.FTCache:
return sps.Cache
}
panic("requested unknown path type")
}
func SetPathByType(sps *sectorbuilder.SectorPaths, fileType sectorbuilder.SectorFileType, p string) {
switch fileType {
case sectorbuilder.FTUnsealed:
sps.Unsealed = p
case sectorbuilder.FTSealed:
sps.Sealed = p
case sectorbuilder.FTCache:
sps.Cache = p
}
}

View File

@ -50,15 +50,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
}
defer done()
var path string
switch ft {
case sectorbuilder.FTUnsealed:
path = paths.Unsealed
case sectorbuilder.FTSealed:
path = paths.Sealed
case sectorbuilder.FTCache:
path = paths.Cache
}
path := sectorutil.PathByType(paths, ft)
if path == "" {
log.Error("acquired path was empty")
w.WriteHeader(500)

View File

@ -1,11 +1,12 @@
package stores
import (
"github.com/filecoin-project/specs-actors/actors/abi"
"context"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
)
type Store interface {
AcquireSector(s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error)
AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error)
}

View File

@ -143,15 +143,7 @@ func (st *Local) AcquireSector(sid abi.SectorID, existing sectorbuilder.SectorFi
}
spath := filepath.Join(p.local, fileType.String(), sectorutil.SectorName(sid))
switch fileType {
case sectorbuilder.FTUnsealed:
out.Unsealed = spath
case sectorbuilder.FTSealed:
out.Sealed = spath
case sectorbuilder.FTCache:
out.Cache = spath
}
sectorutil.SetPathByType(&out, fileType, spath)
existing ^= fileType
}
@ -188,15 +180,7 @@ func (st *Local) AcquireSector(sid abi.SectorID, existing sectorbuilder.SectorFi
return sectorbuilder.SectorPaths{}, nil, xerrors.Errorf("couldn't find a suitable path for a sector")
}
switch fileType {
case sectorbuilder.FTUnsealed:
out.Unsealed = best
case sectorbuilder.FTSealed:
out.Sealed = best
case sectorbuilder.FTCache:
out.Cache = best
}
sectorutil.SetPathByType(&out, fileType, best)
allocate ^= fileType
}

View File

@ -0,0 +1,166 @@
package stores
import (
"context"
"mime"
"net/http"
"os"
"sort"
"sync"
"github.com/hashicorp/go-multierror"
files "github.com/ipfs/go-ipfs-files"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/tarutil"
"github.com/filecoin-project/lotus/storage/sealmgr/sectorutil"
)
type Remote struct {
local Store
remote SectorIndex
auth http.Header
fetchLk sync.Mutex // TODO: this can be much smarter
// TODO: allow multiple parallel fetches
// (make sure to not fetch the same sector data twice)
}
type SectorIndex interface {
FindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]api.StorageInfo, error)
}
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing sectorbuilder.SectorFileType, allocate sectorbuilder.SectorFileType, sealing bool) (sectorbuilder.SectorPaths, func(), error) {
if existing|allocate != existing^allocate {
return sectorbuilder.SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
}
r.fetchLk.Lock()
defer r.fetchLk.Unlock()
paths, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing)
if err != nil {
return sectorbuilder.SectorPaths{}, nil, err
}
for _, fileType := range pathTypes {
if fileType&existing == 0 {
continue
}
if sectorutil.PathByType(paths, fileType) != "" {
continue
}
ap, rdone, err := r.acquireFromRemote(ctx, s, fileType, sealing)
if err != nil {
done()
return sectorbuilder.SectorPaths{}, nil, err
}
done = mergeDone(done, rdone)
sectorutil.SetPathByType(&paths, fileType, ap)
}
return paths, done, nil
}
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType sectorbuilder.SectorFileType, sealing bool) (string, func(), error) {
si, err := r.remote.FindSector(ctx, s, fileType)
if err != nil {
return "", nil, err
}
sort.Slice(si, func(i, j int) bool {
return si[i].Cost < si[j].Cost
})
apaths, done, err := r.local.AcquireSector(ctx, s, 0, fileType, sealing)
if err != nil {
return "", nil, xerrors.Errorf("allocate local sector for fetching: %w", err)
}
dest := sectorutil.PathByType(apaths, fileType)
var merr error
for _, info := range si {
for _, url := range info.URLs {
err := r.fetch(url, dest)
if err != nil {
merr = multierror.Append(merr, xerrors.Errorf("fetch error %s (storage %s) -> %s: %w", url, info.ID, dest, err))
continue
}
if merr != nil {
log.Warnw("acquireFromRemote encountered errors when fetching sector from remote", "errors", merr)
}
return dest, done, nil
}
}
done()
return "", nil, xerrors.Errorf("failed to acquire sector %v from remote: %w", s, merr)
}
func (r *Remote) fetch(url, outname string) error {
log.Infof("Fetch %s -> %s", url, outname)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return xerrors.Errorf("request: %w", err)
}
req.Header = r.auth
resp, err := http.DefaultClient.Do(req)
if err != nil {
return xerrors.Errorf("do request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return xerrors.Errorf("non-200 code: %d", resp.StatusCode)
}
/*bar := pb.New64(w.sizeForType(typ))
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
barreader := bar.NewProxyReader(resp.Body)
bar.Start()
defer bar.Finish()*/
mediatype, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
return xerrors.Errorf("parse media type: %w", err)
}
if err := os.RemoveAll(outname); err != nil {
return xerrors.Errorf("removing dest: %w", err)
}
switch mediatype {
case "application/x-tar":
return tarutil.ExtractTar(resp.Body, outname)
case "application/octet-stream":
return files.WriteTo(files.NewReaderFile(resp.Body), outname)
default:
return xerrors.Errorf("unknown content type: '%s'", mediatype)
}
}
func mergeDone(a func(), b func()) func() {
return func() {
a()
b()
}
}
var _ Store = &Remote{}