From d4197bbadc028094a711199560c84742b8878680 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Nov 2019 19:38:43 +0100 Subject: [PATCH] Working remote PreCommit --- Makefile | 4 +- build/params.go | 4 +- cli/cmd.go | 10 ++--- cmd/lotus-storage-miner/info.go | 4 +- cmd/lotus-storage-miner/run.go | 4 +- cmd/lotus-worker/main.go | 19 ++++------ cmd/lotus-worker/sub.go | 60 +++++++++++++++++++++++++++--- lib/sectorbuilder/remote.go | 15 +++++++- lib/sectorbuilder/sectorbuilder.go | 13 ++++--- lotuspond/main.go | 2 + node/impl/storminer.go | 9 +++++ 11 files changed, 108 insertions(+), 36 deletions(-) diff --git a/Makefile b/Makefile index 084a50900..58853783e 100644 --- a/Makefile +++ b/Makefile @@ -72,13 +72,13 @@ lotus-storage-miner: $(BUILD_DEPS) rm -f lotus-storage-miner go build -o lotus-storage-miner ./cmd/lotus-storage-miner go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build +.PHONY: lotus-storage-miner lotus-worker: $(BUILD_DEPS) rm -f lotus-worker go build -o lotus-worker ./cmd/lotus-worker go run github.com/GeertJohan/go.rice/rice append --exec lotus-worker -i ./build - -.PHONY: lotus-storage-miner +.PHONY: lotus-worker CLEAN+=lotus-storage-miner diff --git a/build/params.go b/build/params.go index 35ccd69f0..eb2d2a167 100644 --- a/build/params.go +++ b/build/params.go @@ -37,7 +37,7 @@ const PaymentChannelClosingDelay = 6 * 60 * 2 // six hours // Consensus / Network // Seconds -const BlockDelay = 12 +const BlockDelay = 2 // Seconds const AllowableClockDrift = BlockDelay * 2 @@ -60,7 +60,7 @@ const WRatioDen = 2 // Proofs // Blocks -const ProvingPeriodDuration uint64 = 300 +const ProvingPeriodDuration uint64 = 30 // PoStChallangeTime sets the window in which post computation should happen // Blocks diff --git a/cli/cmd.go b/cli/cmd.go index 11a1e09e0..472eb9732 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -41,7 +41,7 @@ func RepoInfo(ctx *cli.Context, repoFlag string) (string, string, error) { ma, err := r.APIEndpoint() if err != nil { - return "", "", xerrors.Errorf("failed to get api endpoint: %w", err) + return "", "", xerrors.Errorf("failed to get api endpoint: (%s) %w", p, err) } _, addr, err := manet.DialArgs(ma) if err != nil { @@ -51,7 +51,7 @@ func RepoInfo(ctx *cli.Context, repoFlag string) (string, string, error) { return p, addr, nil } -func getAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) { +func GetRawAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) { rdir, addr, err := RepoInfo(ctx, repoFlag) if err != nil { return "", nil, err @@ -80,7 +80,7 @@ func GetAPI(ctx *cli.Context) (api.Common, jsonrpc.ClientCloser, error) { f = "storagerepo" } - addr, headers, err := getAPI(ctx, f) + addr, headers, err := GetRawAPI(ctx, f) if err != nil { return nil, nil, err } @@ -89,7 +89,7 @@ func GetAPI(ctx *cli.Context) (api.Common, jsonrpc.ClientCloser, error) { } func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, jsonrpc.ClientCloser, error) { - addr, headers, err := getAPI(ctx, "repo") + addr, headers, err := GetRawAPI(ctx, "repo") if err != nil { return nil, nil, err } @@ -98,7 +98,7 @@ func GetFullNodeAPI(ctx *cli.Context) (api.FullNode, jsonrpc.ClientCloser, error } func GetStorageMinerAPI(ctx *cli.Context) (api.StorageMiner, jsonrpc.ClientCloser, error) { - addr, headers, err := getAPI(ctx, "storagerepo") + addr, headers, err := GetRawAPI(ctx, "storagerepo") if err != nil { return nil, nil, err } diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 6727e3e1e..68757f455 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -60,8 +60,8 @@ var infoCmd = &cli.Command{ } fmt.Printf("Worker use:\n") - fmt.Printf("\tLocal: %d / %d (+%d reserved)\n",wstat.LocalTotal-wstat.LocalReserved-wstat.LocalFree, wstat.LocalTotal-wstat.LocalReserved, wstat.LocalReserved ) - fmt.Printf("\tRemote: %d / %d\n", wstat.RemotesTotal - wstat.RemotesFree, wstat.RemotesTotal) + fmt.Printf("\tLocal: %d / %d (+%d reserved)\n", wstat.LocalTotal-wstat.LocalReserved-wstat.LocalFree, wstat.LocalTotal-wstat.LocalReserved, wstat.LocalReserved) + fmt.Printf("\tRemote: %d / %d\n", wstat.RemotesTotal-wstat.RemotesFree, wstat.RemotesTotal) ppe, err := api.StateMinerProvingPeriodEnd(ctx, maddr, nil) if err != nil { diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index b083cb0d9..2145a90bc 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -126,8 +126,8 @@ var runCmd = &cli.Command{ rpcServer.Register("Filecoin", api.PermissionedStorMinerAPI(minerapi)) mux.Handle("/rpc/v0", rpcServer) - mux.HandleFunc("/remote", minerapi.(*impl.StorageMinerAPI).ServeRemote) - mux.Handle("/", http.DefaultServeMux) // pprof + mux.PathPrefix("/remote").HandlerFunc(minerapi.(*impl.StorageMinerAPI).ServeRemote) + mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof ah := &auth.Handler{ Verify: minerapi.AuthVerify, diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index a8c4ae315..f2f2f0ffb 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -1,6 +1,7 @@ package main import ( + "github.com/mitchellh/go-homedir" "os" logging "github.com/ipfs/go-log" @@ -44,7 +45,7 @@ func main() { } if err := app.Run(os.Args); err != nil { - log.Warn(err) + log.Warnf("%+v", err) return } } @@ -52,26 +53,22 @@ func main() { var runCmd = &cli.Command{ Name: "run", Usage: "Start lotus worker", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "pullEndpoint", - Value: "127.0.0.1:30003", - }, - }, Action: func(cctx *cli.Context) error { nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { - return err + return xerrors.Errorf("getting miner api: %w", err) } defer closer() ctx := lcli.ReqContext(cctx) + _, auth, err := lcli.GetRawAPI(cctx, "storagerepo") + _, storageAddr, err := lcli.RepoInfo(cctx, "storagerepo") if err != nil { - return err + return xerrors.Errorf("getting miner repo: %w", err) } - r, _, err := lcli.RepoInfo(cctx, "repo") + r, err := homedir.Expand(cctx.String("repo")) if err != nil { return err } @@ -89,6 +86,6 @@ var runCmd = &cli.Command{ os.Exit(0) }() - return acceptJobs(ctx, nodeApi, storageAddr, r) + return acceptJobs(ctx, nodeApi, "http://"+storageAddr, auth, r) }, } diff --git a/cmd/lotus-worker/sub.go b/cmd/lotus-worker/sub.go index 2a4cae317..803aa6026 100644 --- a/cmd/lotus-worker/sub.go +++ b/cmd/lotus-worker/sub.go @@ -2,6 +2,7 @@ package main import ( "context" + "gopkg.in/cheggaaa/pb.v1" "io" "net/http" "os" @@ -17,11 +18,12 @@ type worker struct { api api.StorageMiner minerEndpoint string repo string + auth http.Header sb *sectorbuilder.SectorBuilder } -func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, repo string) error { +func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth http.Header, repo string) error { act, err := api.ActorAddress(ctx) if err != nil { return err @@ -38,11 +40,16 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, repo CacheDir: filepath.Join(repo, "cache"), SealedDir: filepath.Join(repo, "sealed"), StagedDir: filepath.Join(repo, "staged"), + MetadataDir: filepath.Join(repo, "meta"), }) + if err != nil { + return err + } w := &worker{ api: api, minerEndpoint: endpoint, + auth: auth, repo: repo, sb: sb, } @@ -53,8 +60,12 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, repo } for task := range tasks { + log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type) + res := w.processTask(ctx, task) + log.Infof("Task %d done, err: %s", task.TaskID, res.Err) + if err := api.WorkerDone(ctx, task.TaskID, res); err != nil { log.Error(err) } @@ -90,7 +101,7 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) return errRes(err) } case sectorbuilder.WorkerCommit: - + panic("todo") } return res @@ -99,10 +110,20 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) func (w *worker) fetch(typ string, sectorID uint64) error { outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) - resp, err := http.Get(w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID)) + url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) + log.Infof("Fetch %s", url) + + req, err := http.NewRequest("GET", url, nil) if err != nil { return err } + req.Header = w.auth + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() out, err := os.Create(outname) @@ -111,9 +132,15 @@ func (w *worker) fetch(typ string, sectorID uint64) error { } defer out.Close() - // TODO: progress bar + bar := pb.New64(resp.ContentLength) + bar.ShowPercent = true + bar.ShowSpeed = true + bar.Units = pb.U_BYTES - _, err = io.Copy(out, resp.Body) + bar.Start() + defer bar.Finish() + + _, err = io.Copy(out, bar.NewProxyReader(resp.Body)) return err } @@ -125,14 +152,35 @@ func (w *worker) push(typ string, sectorID uint64) error { return err } - req, err := http.NewRequest("PUT", w.minerEndpoint+"/remote/"+typ+"/"+w.sb.SectorName(sectorID), f) + url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) + log.Infof("Push %s", url) + + fi, err := f.Stat() if err != nil { return err } + + bar := pb.New64(fi.Size()) + bar.ShowPercent = true + bar.ShowSpeed = true + bar.Units = pb.U_BYTES + + bar.Start() + defer bar.Finish() + + req, err := http.NewRequest("PUT", url, bar.NewProxyReader(f)) + if err != nil { + return err + } + req.Header = w.auth + resp, err := http.DefaultClient.Do(req) if err != nil { return err } + if resp.StatusCode != 200 { + return xerrors.Errorf("non-200 response: %d", resp.StatusCode) + } if err := f.Close(); err != nil { return err diff --git a/lib/sectorbuilder/remote.go b/lib/sectorbuilder/remote.go index d984baea5..00adb79f0 100644 --- a/lib/sectorbuilder/remote.go +++ b/lib/sectorbuilder/remote.go @@ -38,7 +38,8 @@ func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, erro busy: 0, } - sb.remotes = append(sb.remotes, r) + sb.remoteCtr++ + sb.remotes[sb.remoteCtr] = r sb.remoteLk.Unlock() go sb.remoteWorker(ctx, r) @@ -59,6 +60,18 @@ func (sb *SectorBuilder) returnTask(task workerCall) { func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) { defer log.Warn("Remote worker disconnected") + defer func() { + sb.remoteLk.Lock() + defer sb.remoteLk.Unlock() + + for i, vr := range sb.remotes { + if vr == r { + delete(sb.remotes, i) + return + } + } + }() + for { select { case task := <-sb.sealTasks: diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 04f521a27..b3703dcb2 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -67,7 +67,8 @@ type SectorBuilder struct { taskCtr uint64 remoteLk sync.Mutex - remotes []*remote + remoteCtr int + remotes map[int]*remote remoteResults map[uint64]chan<- SealRes stopping chan struct{} @@ -160,6 +161,7 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) { taskCtr: 1, sealTasks: make(chan workerCall), remoteResults: map[uint64]chan<- SealRes{}, + remotes: map[int]*remote{}, stopping: make(chan struct{}), } @@ -169,7 +171,7 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) { func NewStandalone(cfg *Config) (*SectorBuilder, error) { for _, dir := range []string{cfg.StagedDir, cfg.SealedDir, cfg.CacheDir, cfg.MetadataDir} { - if err := os.Mkdir(dir, 0755); err != nil { + if err := os.MkdirAll(dir, 0755); err != nil { if os.IsExist(err) { continue } @@ -188,6 +190,7 @@ func NewStandalone(cfg *Config) (*SectorBuilder, error) { sealLocal: true, taskCtr: 1, + remotes: map[int]*remote{}, rateLimit: make(chan struct{}, cfg.WorkerThreads), stopping: make(chan struct{}), }, nil @@ -210,12 +213,12 @@ func (sb *SectorBuilder) RateLimit() func() { } type WorkerStats struct { - LocalFree int + LocalFree int LocalReserved int - LocalTotal int + LocalTotal int // todo: post in progress RemotesTotal int - RemotesFree int + RemotesFree int } func (sb *SectorBuilder) WorkerStats() WorkerStats { diff --git a/lotuspond/main.go b/lotuspond/main.go index d3238dad7..17ad7c970 100644 --- a/lotuspond/main.go +++ b/lotuspond/main.go @@ -88,6 +88,8 @@ var shCmd = &cli.Command{ } } + shcmd.Env = append(os.Environ(), shcmd.Env...) + shcmd.Stdin = os.Stdin shcmd.Stdout = os.Stdout shcmd.Stderr = os.Stderr diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 254aab31c..bef961067 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -3,6 +3,7 @@ package impl import ( "context" "encoding/json" + "fmt" "github.com/gorilla/mux" "io" "net/http" @@ -37,6 +38,8 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/remote/{type}/{sname}", sm.remoteGetSector).Methods("GET") mux.HandleFunc("/remote/{type}/{sname}", sm.remotePutSector).Methods("PUT") + log.Infof("SERVEGETREMOTE %s", r.URL) + mux.ServeHTTP(w, r) } @@ -51,6 +54,12 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques } defer fr.Close() + fi, err := fr.Stat() + if err != nil { + return + } + + w.Header().Set("Content-Length", fmt.Sprint(fi.Size())) w.WriteHeader(200) if _, err := io.Copy(w, fr); err != nil { log.Error(err)