Working remote PreCommit

This commit is contained in:
Łukasz Magiera 2019-11-21 19:38:43 +01:00
parent 98b1de33b6
commit d4197bbadc
11 changed files with 108 additions and 36 deletions

View File

@ -72,13 +72,13 @@ lotus-storage-miner: $(BUILD_DEPS)
rm -f lotus-storage-miner rm -f lotus-storage-miner
go build -o lotus-storage-miner ./cmd/lotus-storage-miner go build -o lotus-storage-miner ./cmd/lotus-storage-miner
go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build
.PHONY: lotus-storage-miner
lotus-worker: $(BUILD_DEPS) lotus-worker: $(BUILD_DEPS)
rm -f lotus-worker rm -f lotus-worker
go build -o lotus-worker ./cmd/lotus-worker go build -o lotus-worker ./cmd/lotus-worker
go run github.com/GeertJohan/go.rice/rice append --exec lotus-worker -i ./build go run github.com/GeertJohan/go.rice/rice append --exec lotus-worker -i ./build
.PHONY: lotus-worker
.PHONY: lotus-storage-miner
CLEAN+=lotus-storage-miner CLEAN+=lotus-storage-miner

View File

@ -37,7 +37,7 @@ const PaymentChannelClosingDelay = 6 * 60 * 2 // six hours
// Consensus / Network // Consensus / Network
// Seconds // Seconds
const BlockDelay = 12 const BlockDelay = 2
// Seconds // Seconds
const AllowableClockDrift = BlockDelay * 2 const AllowableClockDrift = BlockDelay * 2
@ -60,7 +60,7 @@ const WRatioDen = 2
// Proofs // Proofs
// Blocks // Blocks
const ProvingPeriodDuration uint64 = 300 const ProvingPeriodDuration uint64 = 30
// PoStChallangeTime sets the window in which post computation should happen // PoStChallangeTime sets the window in which post computation should happen
// Blocks // Blocks

View File

@ -41,7 +41,7 @@ func RepoInfo(ctx *cli.Context, repoFlag string) (string, string, error) {
ma, err := r.APIEndpoint() ma, err := r.APIEndpoint()
if err != nil { if err != nil {
return "", "", xerrors.Errorf("failed to get api endpoint: %w", err) return "", "", xerrors.Errorf("failed to get api endpoint: (%s) %w", p, err)
} }
_, addr, err := manet.DialArgs(ma) _, addr, err := manet.DialArgs(ma)
if err != nil { if err != nil {
@ -51,7 +51,7 @@ func RepoInfo(ctx *cli.Context, repoFlag string) (string, string, error) {
return p, addr, nil return p, addr, nil
} }
func getAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) { func GetRawAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) {
rdir, addr, err := RepoInfo(ctx, repoFlag) rdir, addr, err := RepoInfo(ctx, repoFlag)
if err != nil { if err != nil {
return "", nil, err return "", nil, err
@ -80,7 +80,7 @@ func GetAPI(ctx *cli.Context) (api.Common, jsonrpc.ClientCloser, error) {
f = "storagerepo" f = "storagerepo"
} }
addr, headers, err := getAPI(ctx, f) addr, headers, err := GetRawAPI(ctx, f)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -89,7 +89,7 @@ func GetAPI(ctx *cli.Context) (api.Common, jsonrpc.ClientCloser, error) {
} }
func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, jsonrpc.ClientCloser, error) { func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, jsonrpc.ClientCloser, error) {
addr, headers, err := getAPI(ctx, "repo") addr, headers, err := GetRawAPI(ctx, "repo")
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -98,7 +98,7 @@ func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, jsonrpc.ClientCloser, error
} }
func GetStorageMinerAPI(ctx *cli.Context) (api.StorageMiner, jsonrpc.ClientCloser, error) { func GetStorageMinerAPI(ctx *cli.Context) (api.StorageMiner, jsonrpc.ClientCloser, error) {
addr, headers, err := getAPI(ctx, "storagerepo") addr, headers, err := GetRawAPI(ctx, "storagerepo")
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -60,8 +60,8 @@ var infoCmd = &cli.Command{
} }
fmt.Printf("Worker use:\n") fmt.Printf("Worker use:\n")
fmt.Printf("\tLocal: %d / %d (+%d reserved)\n",wstat.LocalTotal-wstat.LocalReserved-wstat.LocalFree, wstat.LocalTotal-wstat.LocalReserved, wstat.LocalReserved ) fmt.Printf("\tLocal: %d / %d (+%d reserved)\n", wstat.LocalTotal-wstat.LocalReserved-wstat.LocalFree, wstat.LocalTotal-wstat.LocalReserved, wstat.LocalReserved)
fmt.Printf("\tRemote: %d / %d\n", wstat.RemotesTotal - wstat.RemotesFree, wstat.RemotesTotal) fmt.Printf("\tRemote: %d / %d\n", wstat.RemotesTotal-wstat.RemotesFree, wstat.RemotesTotal)
ppe, err := api.StateMinerProvingPeriodEnd(ctx, maddr, nil) ppe, err := api.StateMinerProvingPeriodEnd(ctx, maddr, nil)
if err != nil { if err != nil {

View File

@ -126,8 +126,8 @@ var runCmd = &cli.Command{
rpcServer.Register("Filecoin", api.PermissionedStorMinerAPI(minerapi)) rpcServer.Register("Filecoin", api.PermissionedStorMinerAPI(minerapi))
mux.Handle("/rpc/v0", rpcServer) mux.Handle("/rpc/v0", rpcServer)
mux.HandleFunc("/remote", minerapi.(*impl.StorageMinerAPI).ServeRemote) mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote)
mux.Handle("/", http.DefaultServeMux) // pprof mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
ah := &auth.Handler{ ah := &auth.Handler{
Verify: minerapi.AuthVerify, Verify: minerapi.AuthVerify,

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"github.com/mitchellh/go-homedir"
"os" "os"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
@ -44,7 +45,7 @@ func main() {
} }
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {
log.Warn(err) log.Warnf("%+v", err)
return return
} }
} }
@ -52,26 +53,22 @@ func main() {
var runCmd = &cli.Command{ var runCmd = &cli.Command{
Name: "run", Name: "run",
Usage: "Start lotus worker", Usage: "Start lotus worker",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "pullEndpoint",
Value: "127.0.0.1:30003",
},
},
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil { if err != nil {
return err return xerrors.Errorf("getting miner api: %w", err)
} }
defer closer() defer closer()
ctx := lcli.ReqContext(cctx) ctx := lcli.ReqContext(cctx)
_, auth, err := lcli.GetRawAPI(cctx, "storagerepo")
_, storageAddr, err := lcli.RepoInfo(cctx, "storagerepo") _, storageAddr, err := lcli.RepoInfo(cctx, "storagerepo")
if err != nil { if err != nil {
return err return xerrors.Errorf("getting miner repo: %w", err)
} }
r, _, err := lcli.RepoInfo(cctx, "repo") r, err := homedir.Expand(cctx.String("repo"))
if err != nil { if err != nil {
return err return err
} }
@ -89,6 +86,6 @@ var runCmd = &cli.Command{
os.Exit(0) os.Exit(0)
}() }()
return acceptJobs(ctx, nodeApi, storageAddr, r) return acceptJobs(ctx, nodeApi, "http://"+storageAddr, auth, r)
}, },
} }

View File

@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"gopkg.in/cheggaaa/pb.v1"
"io" "io"
"net/http" "net/http"
"os" "os"
@ -17,11 +18,12 @@ type worker struct {
api api.StorageMiner api api.StorageMiner
minerEndpoint string minerEndpoint string
repo string repo string
auth http.Header
sb *sectorbuilder.SectorBuilder sb *sectorbuilder.SectorBuilder
} }
func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, repo string) error { func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth http.Header, repo string) error {
act, err := api.ActorAddress(ctx) act, err := api.ActorAddress(ctx)
if err != nil { if err != nil {
return err return err
@ -38,11 +40,16 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, repo
CacheDir: filepath.Join(repo, "cache"), CacheDir: filepath.Join(repo, "cache"),
SealedDir: filepath.Join(repo, "sealed"), SealedDir: filepath.Join(repo, "sealed"),
StagedDir: filepath.Join(repo, "staged"), StagedDir: filepath.Join(repo, "staged"),
MetadataDir: filepath.Join(repo, "meta"),
}) })
if err != nil {
return err
}
w := &worker{ w := &worker{
api: api, api: api,
minerEndpoint: endpoint, minerEndpoint: endpoint,
auth: auth,
repo: repo, repo: repo,
sb: sb, sb: sb,
} }
@ -53,8 +60,12 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, repo
} }
for task := range tasks { for task := range tasks {
log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type)
res := w.processTask(ctx, task) res := w.processTask(ctx, task)
log.Infof("Task %d done, err: %s", task.TaskID, res.Err)
if err := api.WorkerDone(ctx, task.TaskID, res); err != nil { if err := api.WorkerDone(ctx, task.TaskID, res); err != nil {
log.Error(err) log.Error(err)
} }
@ -90,7 +101,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
return errRes(err) return errRes(err)
} }
case sectorbuilder.WorkerCommit: case sectorbuilder.WorkerCommit:
panic("todo")
} }
return res return res
@ -99,10 +110,20 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask)
func (w *worker) fetch(typ string, sectorID uint64) error { func (w *worker) fetch(typ string, sectorID uint64) error {
outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID))
resp, err := http.Get(w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)) url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)
log.Infof("Fetch %s", url)
req, err := http.NewRequest("GET", url, nil)
if err != nil { if err != nil {
return err return err
} }
req.Header = w.auth
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close() defer resp.Body.Close()
out, err := os.Create(outname) out, err := os.Create(outname)
@ -111,9 +132,15 @@ func (w *worker) fetch(typ string, sectorID uint64) error {
} }
defer out.Close() defer out.Close()
// TODO: progress bar bar := pb.New64(resp.ContentLength)
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
_, err = io.Copy(out, resp.Body) bar.Start()
defer bar.Finish()
_, err = io.Copy(out, bar.NewProxyReader(resp.Body))
return err return err
} }
@ -125,14 +152,35 @@ func (w *worker) push(typ string, sectorID uint64) error {
return err return err
} }
req, err := http.NewRequest("PUT", w.minerEndpoint+"/remote/"+typ+"/"+w.sb.SectorName(sectorID), f) url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)
log.Infof("Push %s", url)
fi, err := f.Stat()
if err != nil { if err != nil {
return err return err
} }
bar := pb.New64(fi.Size())
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES
bar.Start()
defer bar.Finish()
req, err := http.NewRequest("PUT", url, bar.NewProxyReader(f))
if err != nil {
return err
}
req.Header = w.auth
resp, err := http.DefaultClient.Do(req) resp, err := http.DefaultClient.Do(req)
if err != nil { if err != nil {
return err return err
} }
if resp.StatusCode != 200 {
return xerrors.Errorf("non-200 response: %d", resp.StatusCode)
}
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
return err return err

View File

@ -38,7 +38,8 @@ func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, erro
busy: 0, busy: 0,
} }
sb.remotes = append(sb.remotes, r) sb.remoteCtr++
sb.remotes[sb.remoteCtr] = r
sb.remoteLk.Unlock() sb.remoteLk.Unlock()
go sb.remoteWorker(ctx, r) go sb.remoteWorker(ctx, r)
@ -59,6 +60,18 @@ func (sb *SectorBuilder) returnTask(task workerCall) {
func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) { func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) {
defer log.Warn("Remote worker disconnected") defer log.Warn("Remote worker disconnected")
defer func() {
sb.remoteLk.Lock()
defer sb.remoteLk.Unlock()
for i, vr := range sb.remotes {
if vr == r {
delete(sb.remotes, i)
return
}
}
}()
for { for {
select { select {
case task := <-sb.sealTasks: case task := <-sb.sealTasks:

View File

@ -67,7 +67,8 @@ type SectorBuilder struct {
taskCtr uint64 taskCtr uint64
remoteLk sync.Mutex remoteLk sync.Mutex
remotes []*remote remoteCtr int
remotes map[int]*remote
remoteResults map[uint64]chan<- SealRes remoteResults map[uint64]chan<- SealRes
stopping chan struct{} stopping chan struct{}
@ -160,6 +161,7 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
taskCtr: 1, taskCtr: 1,
sealTasks: make(chan workerCall), sealTasks: make(chan workerCall),
remoteResults: map[uint64]chan<- SealRes{}, remoteResults: map[uint64]chan<- SealRes{},
remotes: map[int]*remote{},
stopping: make(chan struct{}), stopping: make(chan struct{}),
} }
@ -169,7 +171,7 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) {
func NewStandalone(cfg *Config) (*SectorBuilder, error) { func NewStandalone(cfg *Config) (*SectorBuilder, error) {
for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.MetadataDir} { for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.MetadataDir} {
if err := os.Mkdir(dir, 0755); err != nil { if err := os.MkdirAll(dir, 0755); err != nil {
if os.IsExist(err) { if os.IsExist(err) {
continue continue
} }
@ -188,6 +190,7 @@ func NewStandalone(cfg *Config) (*SectorBuilder, error) {
sealLocal: true, sealLocal: true,
taskCtr: 1, taskCtr: 1,
remotes: map[int]*remote{},
rateLimit: make(chan struct{}, cfg.WorkerThreads), rateLimit: make(chan struct{}, cfg.WorkerThreads),
stopping: make(chan struct{}), stopping: make(chan struct{}),
}, nil }, nil
@ -210,12 +213,12 @@ func (sb *SectorBuilder) RateLimit() func() {
} }
type WorkerStats struct { type WorkerStats struct {
LocalFree int LocalFree int
LocalReserved int LocalReserved int
LocalTotal int LocalTotal int
// todo: post in progress // todo: post in progress
RemotesTotal int RemotesTotal int
RemotesFree int RemotesFree int
} }
func (sb *SectorBuilder) WorkerStats() WorkerStats { func (sb *SectorBuilder) WorkerStats() WorkerStats {

View File

@ -88,6 +88,8 @@ var shCmd = &cli.Command{
} }
} }
shcmd.Env = append(os.Environ(), shcmd.Env...)
shcmd.Stdin = os.Stdin shcmd.Stdin = os.Stdin
shcmd.Stdout = os.Stdout shcmd.Stdout = os.Stdout
shcmd.Stderr = os.Stderr shcmd.Stderr = os.Stderr

View File

@ -3,6 +3,7 @@ package impl
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"io" "io"
"net/http" "net/http"
@ -37,6 +38,8 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/remote/{type}/{sname}", sm.remoteGetSector).Methods("GET") mux.HandleFunc("/remote/{type}/{sname}", sm.remoteGetSector).Methods("GET")
mux.HandleFunc("/remote/{type}/{sname}", sm.remotePutSector).Methods("PUT") mux.HandleFunc("/remote/{type}/{sname}", sm.remotePutSector).Methods("PUT")
log.Infof("SERVEGETREMOTE %s", r.URL)
mux.ServeHTTP(w, r) mux.ServeHTTP(w, r)
} }
@ -51,6 +54,12 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques
} }
defer fr.Close() defer fr.Close()
fi, err := fr.Stat()
if err != nil {
return
}
w.Header().Set("Content-Length", fmt.Sprint(fi.Size()))
w.WriteHeader(200) w.WriteHeader(200)
if _, err := io.Copy(w, fr); err != nil { if _, err := io.Copy(w, fr); err != nil {
log.Error(err) log.Error(err)