remote-worker: wire up storage miner endpoints

This commit is contained in:
Łukasz Magiera 2019-11-21 15:10:51 +01:00
parent 9725eb78bf
commit ba3ad75670
11 changed files with 170 additions and 21 deletions

View File

@ -3,7 +3,6 @@ package api
import (
"context"
"fmt"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
@ -54,6 +53,8 @@ type StorageMiner interface {
ActorAddress(context.Context) (address.Address, error)
ActorSectorSize(context.Context, address.Address) (uint64, error)
// Temp api for testing
StoreGarbageData(context.Context) error

View File

@ -133,7 +133,8 @@ type StorageMinerStruct struct {
CommonStruct
Internal struct {
ActorAddress func(context.Context) (address.Address, error) `perm:"read"`
ActorAddress func(context.Context) (address.Address, error) `perm:"read"`
ActorSectorSize func(context.Context, address.Address) (uint64, error) `perm:"read"`
StoreGarbageData func(context.Context) error `perm:"write"`
@ -504,6 +505,10 @@ func (c *StorageMinerStruct) ActorAddress(ctx context.Context) (address.Address,
return c.Internal.ActorAddress(ctx)
}
func (c *StorageMinerStruct) ActorSectorSize(ctx context.Context, addr address.Address) (uint64, error) {
return c.Internal.ActorSectorSize(ctx, addr)
}
func (c *StorageMinerStruct) StoreGarbageData(ctx context.Context) error {
return c.Internal.StoreGarbageData(ctx)
}

View File

@ -8,6 +8,7 @@ import (
"os/signal"
"syscall"
mux "github.com/gorilla/mux"
"github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
"golang.org/x/xerrors"
@ -19,6 +20,7 @@ import (
"github.com/filecoin-project/lotus/lib/auth"
"github.com/filecoin-project/lotus/lib/jsonrpc"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/repo"
)
@ -118,17 +120,21 @@ var runCmd = &cli.Command{
return xerrors.Errorf("could not listen: %w", err)
}
mux := mux.NewRouter()
rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", api.PermissionedStorMinerAPI(minerapi))
mux.Handle("/rpc/v0", rpcServer)
mux.HandleFunc("/remote", minerapi.(*impl.StorageMinerAPI).ServeRemote)
mux.Handle("/", http.DefaultServeMux) // pprof
ah := &auth.Handler{
Verify: minerapi.AuthVerify,
Next: rpcServer.ServeHTTP,
Next: mux.ServeHTTP,
}
http.Handle("/rpc/v0", ah)
srv := &http.Server{Handler: http.DefaultServeMux}
srv := &http.Server{Handler: ah}
sigChan := make(chan os.Signal, 2)
go func() {

View File

@ -1,14 +1,13 @@
package lotus_worker
package main
import (
"net/http"
"os"
"github.com/filecoin-project/lotus/api"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
)
@ -52,7 +51,7 @@ func main() {
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus fountain",
Usage: "Start lotus worker",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "pullEndpoint",
@ -72,7 +71,7 @@ var runCmd = &cli.Command{
return err
}
if v.APIVersion != build.APIVersion {
return xerrors.Errorf("lotus-daemon API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion})
return xerrors.Errorf("lotus-storage-miner API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion})
}
go func() {
@ -80,6 +79,6 @@ var runCmd = &cli.Command{
os.Exit(0)
}()
return http.ListenAndServe(cctx.String("pullEndpoint"), nil)
return acceptJobs(ctx, nodeApi)
},
}

View File

@ -1,13 +1,14 @@
package lotus_worker
package main
import (
"context"
"golang.org/x/xerrors"
"io"
"net/http"
"os"
"path/filepath"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
@ -20,9 +21,30 @@ type worker struct {
sb *sectorbuilder.SectorBuilder
}
func acceptJobs(ctx context.Context, api api.StorageMiner) error {
func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, repo string) error {
act, err := api.ActorAddress(ctx)
if err != nil {
return err
}
ssize, err := api.ActorSectorSize(ctx, act)
if err != nil {
return err
}
sb, err := sectorbuilder.NewStandalone(&sectorbuilder.Config{
SectorSize: ssize,
Miner: act,
WorkerThreads: 1,
CacheDir: filepath.Join(repo, "cache"),
SealedDir: filepath.Join(repo, "sealed"),
StagedDir: filepath.Join(repo, "staged"),
})
w := &worker{
api: api,
api: api,
minerEndpoint: endpoint,
repo: repo,
sb: sb,
}
tasks, err := api.WorkerQueue(ctx)
@ -33,8 +55,13 @@ func acceptJobs(ctx context.Context, api api.StorageMiner) error {
for task := range tasks {
res := w.processTask(ctx, task)
api.WorkerDone(ctx)
if err := api.WorkerDone(ctx, task.TaskID, res); err != nil {
log.Error(err)
}
}
log.Warn("acceptJobs exit")
return nil
}
func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) sectorbuilder.SealRes {

1
go.mod
View File

@ -19,6 +19,7 @@ require (
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/google/go-cmp v0.3.1 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
github.com/gorilla/mux v1.7.3
github.com/gorilla/websocket v1.4.0
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/golang-lru v0.5.3

2
go.sum
View File

@ -123,6 +123,8 @@ github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f h1:KMlcu9X58lhTA
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=

View File

@ -44,6 +44,24 @@ func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) {
return dir, err
}
func (sb *SectorBuilder) OpenRemoteRead(typ string, sectorName string) (*os.File, error) {
switch typ {
case "staged":
return os.OpenFile(filepath.Join(sb.stagedDir, sectorName), os.O_RDONLY, 0644)
default:
return nil, xerrors.Errorf("unknown sector type for read: %s", typ)
}
}
func (sb *SectorBuilder) OpenRemoteWrite(typ string, sectorName string) (*os.File, error) {
switch typ {
case "sealed":
return os.OpenFile(filepath.Join(sb.sealedDir, sectorName), os.O_WRONLY|os.O_CREATE, 0644)
default:
return nil, xerrors.Errorf("unknown sector type for write: %s", typ)
}
}
func toReadableFile(r io.Reader, n int64) (*os.File, func() error, error) {
f, ok := r.(*os.File)
if ok {

View File

@ -39,10 +39,10 @@ func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, erro
}
sb.remotes = append(sb.remotes, r)
go sb.remoteWorker(ctx, r)
sb.remoteLk.Unlock()
go sb.remoteWorker(ctx, r)
return taskCh, nil
}
@ -92,6 +92,7 @@ func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) {
case <-sb.stopping:
return
}
case <-ctx.Done():
log.Warnf("context expired while waiting for task %d (sector %d): %s", task.task.TaskID, task.task.SectorID, ctx.Err())
return
@ -104,6 +105,10 @@ func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) {
case <-sb.stopping:
return
}
r.lk.Lock()
r.busy = 0
r.lk.Unlock()
}
}

View File

@ -82,7 +82,7 @@ type remote struct {
lk sync.Mutex
sealTasks chan<- WorkerTask
busy uint64
busy uint64 // only for metrics
}
type Config struct {
@ -157,11 +157,37 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
sealTasks: make(chan workerCall),
remoteResults: map[uint64]chan<- SealRes{},
stopping: make(chan struct{}),
}
return sb, nil
}
func NewStandalone(cfg *Config) (*SectorBuilder, error) {
for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.MetadataDir} {
if err := os.Mkdir(dir, 0755); err != nil {
if os.IsExist(err) {
continue
}
return nil, err
}
}
return &SectorBuilder{
handle: nil,
ds: nil,
ssize: cfg.SectorSize,
Miner: cfg.Miner,
stagedDir: cfg.StagedDir,
sealedDir: cfg.SealedDir,
cacheDir: cfg.CacheDir,
sealLocal: true,
rateLimit: make(chan struct{}, cfg.WorkerThreads),
stopping: make(chan struct{}),
}, nil
}
func (sb *SectorBuilder) RateLimit() func() {
if cap(sb.rateLimit) == len(sb.rateLimit) {
log.Warn("rate-limiting sectorbuilder call")

View File

@ -2,6 +2,10 @@ package impl
import (
"context"
"encoding/json"
"github.com/gorilla/mux"
"io"
"net/http"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/address"
@ -21,6 +25,57 @@ type StorageMinerAPI struct {
Full api.FullNode
}
func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
if !api.HasPerm(r.Context(), api.PermAdmin) {
w.WriteHeader(401)
json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing write permission"})
return
}
mux := mux.NewRouter()
mux.HandleFunc("/remote/{type}/{sname}", sm.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{sname}", sm.remotePutSector).Methods("PUT")
mux.ServeHTTP(w, r)
}
func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
fr, err := sm.SectorBuilder.OpenRemoteRead(vars["type"], vars["sname"])
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
defer fr.Close()
w.WriteHeader(200)
if _, err := io.Copy(w, fr); err != nil {
log.Error(err)
return
}
}
func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
fr, err := sm.SectorBuilder.OpenRemoteWrite(vars["type"], vars["sname"])
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
defer fr.Close()
w.WriteHeader(200)
if _, err := io.Copy(w, fr); err != nil {
log.Error(err)
return
}
}
func (sm *StorageMinerAPI) WorkerStats(context.Context) (api.WorkerStats, error) {
free, reserved, total := sm.SectorBuilder.WorkerStats()
return api.WorkerStats{
@ -34,6 +89,10 @@ func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error
return sm.SectorBuilderConfig.Miner, nil
}
func (sm *StorageMinerAPI) ActorSectorSize(ctx context.Context, addr address.Address) (uint64, error) {
return sm.Full.StateMinerSectorSize(ctx, addr, nil)
}
func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) error {
return sm.Miner.StoreGarbageData()
}
@ -96,7 +155,7 @@ func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilde
}
func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error {
return sm.SectorBuilder.TaskDone(task, res)
return sm.SectorBuilder.TaskDone(ctx, task, res)
}
var _ api.StorageMiner = &StorageMinerAPI{}