From 89556819aeb1248b409b448bab3be20f7af11fb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 30 Nov 2019 14:22:50 +0100 Subject: [PATCH] seal-worker: Handle cache --- cmd/lotus-seal-worker/sub.go | 98 ++++++++++++++++++++++-------- lib/sectorbuilder/files.go | 19 ++---- lib/sectorbuilder/sectorbuilder.go | 1 + node/impl/storminer.go | 86 +++++++++++++++++++++----- scripts/dev/drop-local-repos | 2 +- 5 files changed, 153 insertions(+), 53 deletions(-) diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index 436b92dd4..04480e931 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -2,8 +2,11 @@ package main import ( "context" + files "github.com/ipfs/go-ipfs-files" "gopkg.in/cheggaaa/pb.v1" "io" + "mime" + "mime/multipart" "net/http" "os" "path/filepath" @@ -64,7 +67,7 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth res := w.processTask(ctx, task) - log.Infof("Task %d done, err: %s", task.TaskID, res.Err) + log.Infof("Task %d done, err: %+v", task.TaskID, res.GoErr) if err := api.WorkerDone(ctx, task.TaskID, res); err != nil { log.Error(err) @@ -97,11 +100,13 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) } res.Rspco = rspco.ToJson() - // TODO: push cache - if err := w.push("sealed", task.SectorID); err != nil { return errRes(xerrors.Errorf("pushing precommited data: %w", err)) } + + if err := w.push("cache", task.SectorID); err != nil { + return errRes(xerrors.Errorf("pushing precommited data: %w", err)) + } case sectorbuilder.WorkerCommit: proof, err := w.sb.SealCommit(task.SectorID, task.SealTicket, task.SealSeed, task.Pieces, task.Rspco) if err != nil { @@ -110,7 +115,9 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) res.Proof = proof - // TODO: Push cache + if err := w.push("cache", task.SectorID); err != nil { + return errRes(xerrors.Errorf("pushing precommited data: %w", err)) + } } return res @@ -120,56 +127,82 @@ func (w *worker) fetch(typ string, sectorID uint64) error { outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) - log.Infof("Fetch %s", url) + log.Infof("Fetch %s %s", typ, url) req, err := http.NewRequest("GET", url, nil) if err != nil { - return err + return xerrors.Errorf("request: %w", err) } req.Header = w.auth resp, err := http.DefaultClient.Do(req) if err != nil { - return err + return xerrors.Errorf("do request: %w", err) } defer resp.Body.Close() - out, err := os.Create(outname) - if err != nil { - return err - } - defer out.Close() - bar := pb.New64(resp.ContentLength) bar.ShowPercent = true bar.ShowSpeed = true bar.Units = pb.U_BYTES + barreader := bar.NewProxyReader(resp.Body) + bar.Start() defer bar.Finish() - _, err = io.Copy(out, bar.NewProxyReader(resp.Body)) - return err + mediatype, p, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) + if err != nil { + return xerrors.Errorf("parse media type: %w", err) + } + + var file files.Node + switch mediatype { + case "multipart/form-data": + mpr := multipart.NewReader(barreader, p["boundary"]) + + file, err = files.NewFileFromPartReader(mpr, mediatype) + if err != nil { + return xerrors.Errorf("call to NewFileFromPartReader failed: %w", err) + } + + case "application/octet-stream": + file = files.NewReaderFile(barreader) + default: + return xerrors.Errorf("unknown content type: '%s'", mediatype) + } + + // WriteTo is unhappy when things exist + if err := os.RemoveAll(outname); err != nil { + return xerrors.Errorf("removing dest: %w", err) + } + + return files.WriteTo(file, outname) } func (w *worker) push(typ string, sectorID uint64) error { outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) - f, err := os.OpenFile(outname, os.O_RDONLY, 0644) + stat, err := os.Stat(outname) + if err != nil { + return err + } + + f, err := files.NewSerialFile(outname, false, stat) if err != nil { return err } url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) - log.Infof("Push %s", url) + log.Infof("Push %s %s", typ, url) - fi, err := f.Stat() + sz, err := f.Size() if err != nil { - return err + return xerrors.Errorf("getting size: %w", err) } - bar := pb.New64(fi.Size()) + bar := pb.New64(sz) bar.ShowPercent = true bar.ShowSpeed = true bar.Units = pb.U_BYTES @@ -177,11 +210,25 @@ func (w *worker) push(typ string, sectorID uint64) error { bar.Start() defer bar.Finish() //todo set content size - req, err := http.NewRequest("PUT", url, bar.NewProxyReader(f)) + + header := w.auth + + var r io.Reader + r, file := f.(files.File) + if !file { + mfr := files.NewMultiFileReader(f.(files.Directory), true) + + header.Set("Content-Type", "multipart/form-data; boundary="+mfr.Boundary()) + r = mfr + } else { + header.Set("Content-Type", "application/octet-stream") + } + + req, err := http.NewRequest("PUT", url, bar.NewProxyReader(r)) if err != nil { return err } - req.Header = w.auth + req.Header = header resp, err := http.DefaultClient.Do(req) if err != nil { @@ -201,7 +248,10 @@ func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) err = w.fetch("staged", sectorID) case sectorbuilder.WorkerCommit: err = w.fetch("sealed", sectorID) - // todo: cache + if err != nil { + return xerrors.Errorf("fetch sealed: %w", err) + } + err = w.fetch("cache", sectorID) } if err != nil { return xerrors.Errorf("fetch failed: %w", err) @@ -210,5 +260,5 @@ func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) } func errRes(err error) sectorbuilder.SealRes { - return sectorbuilder.SealRes{Err: err.Error()} + return sectorbuilder.SealRes{Err: err.Error(), GoErr: err} } diff --git a/lib/sectorbuilder/files.go b/lib/sectorbuilder/files.go index 0869d4030..876bc71b7 100644 --- a/lib/sectorbuilder/files.go +++ b/lib/sectorbuilder/files.go @@ -44,23 +44,16 @@ func (sb *SectorBuilder) sectorCacheDir(sectorID uint64) (string, error) { return dir, err } -func (sb *SectorBuilder) OpenRemoteRead(typ string, sectorName string) (*os.File, error) { +func (sb *SectorBuilder) GetPath(typ string, sectorName string) (string, error) { switch typ { case "staged": - return os.OpenFile(filepath.Join(sb.stagedDir, sectorName), os.O_RDONLY, 0644) + return filepath.Join(sb.stagedDir, sectorName), nil case "sealed": - return os.OpenFile(filepath.Join(sb.sealedDir, sectorName), os.O_RDONLY, 0644) + return filepath.Join(sb.sealedDir, sectorName), nil + case "cache": + return filepath.Join(sb.cacheDir, sectorName), nil default: - return nil, xerrors.Errorf("unknown sector type for read: %s", typ) - } -} - -func (sb *SectorBuilder) OpenRemoteWrite(typ string, sectorName string) (*os.File, error) { - switch typ { - case "sealed": - return os.OpenFile(filepath.Join(sb.sealedDir, sectorName), os.O_WRONLY|os.O_CREATE, 0644) - default: - return nil, xerrors.Errorf("unknown sector type for write: %s", typ) + return "", xerrors.Errorf("unknown sector type for write: %s", typ) } } diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index eaebde51f..b8e08c3c4 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -98,6 +98,7 @@ func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput { type SealRes struct { Err string + GoErr error `json:"-"` Proof []byte Rspco JsonRSPCO diff --git a/node/impl/storminer.go b/node/impl/storminer.go index a4f04cd62..96884170f 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -3,17 +3,18 @@ package impl import ( "context" "encoding/json" - "fmt" - "github.com/gorilla/mux" - "io" - "net/http" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" + "github.com/gorilla/mux" + files "github.com/ipfs/go-ipfs-files" + "io" + "mime" + "net/http" + "os" ) type StorageMinerAPI struct { @@ -48,22 +49,40 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - fr, err := sm.SectorBuilder.OpenRemoteRead(vars["type"], vars["sname"]) + path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"]) if err != nil { log.Error(err) w.WriteHeader(500) return } - defer fr.Close() - fi, err := fr.Stat() + stat, err := os.Stat(path) if err != nil { + log.Error(err) + w.WriteHeader(500) return } - w.Header().Set("Content-Length", fmt.Sprint(fi.Size())) + f, err := files.NewSerialFile(path, false, stat) + if err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + + var rd io.Reader + rd, file := f.(files.File) + if !file { + mfr := files.NewMultiFileReader(f.(files.Directory), true) + + w.Header().Set("Content-Type", "multipart/form-data; boundary="+mfr.Boundary()) + rd = mfr + } else { + w.Header().Set("Content-Type", "application/octet-stream") + } + w.WriteHeader(200) - if _, err := io.Copy(w, fr); err != nil { + if _, err := io.Copy(w, rd); err != nil { log.Error(err) return } @@ -72,21 +91,58 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - fr, err := sm.SectorBuilder.OpenRemoteWrite(vars["type"], vars["sname"]) + path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"]) if err != nil { log.Error(err) w.WriteHeader(500) return } - defer fr.Close() - w.WriteHeader(200) - n, err := io.Copy(fr, r.Body) + var file files.Node + + mediatype, _, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { log.Error(err) + w.WriteHeader(500) return } - log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], n) + + switch mediatype { + case "multipart/form-data": + mpr, err := r.MultipartReader() + if err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + + file, err = files.NewFileFromPartReader(mpr, mediatype) + if err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + + default: + file = files.NewReaderFile(r.Body) + } + + // WriteTo is unhappy when things exist (also cleans up cache after Commit) + if err := os.RemoveAll(path); err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + + if err := files.WriteTo(file, path); err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + + w.WriteHeader(200) + + log.Infof("received %s sector (%s): %d bytes", vars["type"], vars["sname"], r.ContentLength) } func (sm *StorageMinerAPI) WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) { diff --git a/scripts/dev/drop-local-repos b/scripts/dev/drop-local-repos index df43d3e6c..939030bad 100755 --- a/scripts/dev/drop-local-repos +++ b/scripts/dev/drop-local-repos @@ -2,4 +2,4 @@ set -o xtrace -rm -rf ~/.lotus ~/.lotusstorage/ ~/.genesis-sectors \ No newline at end of file +rm -rf ~/.lotus ~/.lotusstorage/ ~/.genesis-sectors ~/.lotusworker