From b2001db9d530298c718d2f1f25b212b7730bf076 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 4 Dec 2019 17:53:32 +0100 Subject: [PATCH] worker: Fix rebaining bugs --- Makefile | 6 ++--- build/params_devnet.go | 6 +++++ build/params_shared.go | 6 ----- cmd/lotus-seal-worker/main.go | 2 +- cmd/lotus-seal-worker/sub.go | 24 +++++++++---------- cmd/lotus-seal-worker/transfer.go | 16 +++++++++---- cmd/lotus-storage-miner/init.go | 6 ++++- cmd/lotus-storage-miner/run.go | 38 ++++++++++++++++++------------ lib/sectorbuilder/remote.go | 2 +- lib/sectorbuilder/sectorbuilder.go | 16 ++++++------- lib/sectorbuilder/simple.go | 2 +- lib/systar/systar.go | 34 +++++++++++++------------- lotuspond/front/src/StorageNode.js | 5 ++++ lotuspond/spawn.go | 6 ++--- node/impl/storminer.go | 7 +++--- 15 files changed, 99 insertions(+), 77 deletions(-) diff --git a/Makefile b/Makefile index 0a3235bc0..90dc91d5d 100644 --- a/Makefile +++ b/Makefile @@ -46,7 +46,7 @@ deps: $(BUILD_DEPS) .PHONY: deps debug: GOFLAGS=-tags=debug -debug: lotus lotus-storage-miner +debug: lotus lotus-storage-miner lotus-seal-worker lotus-seed lotus: $(BUILD_DEPS) rm -f lotus @@ -65,7 +65,7 @@ BINS+=lotus-storage-miner lotus-seal-worker: $(BUILD_DEPS) rm -f lotus-seal-worker - go build -o lotus-seal-worker ./cmd/lotus-seal-worker + go build $(GOFLAGS) -o lotus-seal-worker ./cmd/lotus-seal-worker go run github.com/GeertJohan/go.rice/rice append --exec lotus-seal-worker -i ./build .PHONY: lotus-seal-worker BINS+=lotus-seal-worker @@ -81,7 +81,7 @@ install: lotus-seed: $(BUILD_DEPS) rm -f lotus-seed - go build -o lotus-seed ./cmd/lotus-seed + go build $(GOFLAGS) -o lotus-seed ./cmd/lotus-seed go run github.com/GeertJohan/go.rice/rice append --exec lotus-seed -i ./build .PHONY: lotus-seed diff --git a/build/params_devnet.go b/build/params_devnet.go index 6f53d2704..6c4dab128 100644 --- a/build/params_devnet.go +++ b/build/params_devnet.go @@ -2,6 +2,12 @@ package build +var SectorSizes = []uint64{ + 16 << 20, + 256 << 20, + 1 << 30, +} + // Seconds const BlockDelay = 30 diff --git a/build/params_shared.go b/build/params_shared.go index 40eb643f0..3d3bc9aac 100644 --- a/build/params_shared.go +++ b/build/params_shared.go @@ -12,12 +12,6 @@ import ( const UnixfsChunkSize uint64 = 1 << 20 const UnixfsLinksPerLevel = 1024 -var SectorSizes = []uint64{ - 16 << 20, - 256 << 20, - 1 << 30, -} - func SupportedSectorSize(ssize uint64) bool { for _, ss := range SectorSizes { if ssize == ss { diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 8fe1f8e9d..3fa05d037 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -83,7 +83,7 @@ var runCmd = &cli.Command{ go func() { <-ctx.Done() - os.Exit(0) + log.Warn("Shutting down..") }() return acceptJobs(ctx, nodeApi, "http://"+storageAddr, auth, r) diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index 416f8e373..b3cb828e0 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -2,13 +2,7 @@ package main import ( "context" - files "github.com/ipfs/go-ipfs-files" - "gopkg.in/cheggaaa/pb.v1" - "io" - "mime" - "mime/multipart" "net/http" - "os" "path/filepath" "golang.org/x/xerrors" @@ -62,15 +56,21 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth return err } - for task := range tasks { - log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type) +loop: + for { + select { + case task := <-tasks: + log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type) - res := w.processTask(ctx, task) + res := w.processTask(ctx, task) - log.Infof("Task %d done, err: %+v", task.TaskID, res.GoErr) + log.Infof("Task %d done, err: %+v", task.TaskID, res.GoErr) - if err := api.WorkerDone(ctx, task.TaskID, res); err != nil { - log.Error(err) + if err := api.WorkerDone(ctx, task.TaskID, res); err != nil { + log.Error(err) + } + case <-ctx.Done(): + break loop } } diff --git a/cmd/lotus-seal-worker/transfer.go b/cmd/lotus-seal-worker/transfer.go index b363e2324..68727c056 100644 --- a/cmd/lotus-seal-worker/transfer.go +++ b/cmd/lotus-seal-worker/transfer.go @@ -31,9 +31,12 @@ func (w *worker) fetch(typ string, sectorID uint64) error { if err != nil { return xerrors.Errorf("do request: %w", err) } - defer resp.Body.Close() + if resp.StatusCode != 200 { + return xerrors.Errorf("non-200 code: %d", resp.StatusCode) + } + bar := pb.New64(resp.ContentLength) bar.ShowPercent = true bar.ShowSpeed = true @@ -49,14 +52,13 @@ func (w *worker) fetch(typ string, sectorID uint64) error { return xerrors.Errorf("parse media type: %w", err) } - // WriteTo is unhappy when things exist if err := os.RemoveAll(outname); err != nil { return xerrors.Errorf("removing dest: %w", err) } switch mediatype { case "application/x-tar": - return systar.ExtractTar(barreader, outname) + return systar.ExtractTar(barreader, filepath.Dir(outname)) case "application/octet-stream": return files.WriteTo(files.NewReaderFile(barreader), outname) default: @@ -98,7 +100,6 @@ func (w *worker) push(typ string, sectorID uint64) error { header := w.auth - if stat.IsDir() { header.Set("Content-Type", "application/x-tar") } else { @@ -119,7 +120,12 @@ func (w *worker) push(typ string, sectorID uint64) error { return xerrors.Errorf("non-200 response: %d", resp.StatusCode) } - return resp.Body.Close() + if err := resp.Body.Close(); err != nil { + return err + } + + // TODO: keep files around for later stages of sealing + return os.RemoveAll(filename) } func (w *worker) fetchSector(sectorID uint64, typ sectorbuilder.WorkerTaskType) error { diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 359277cce..65b6c9597 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -71,6 +71,10 @@ var initCmd = &cli.Command{ Name: "pre-sealed-sectors", Usage: "specify set of presealed sectors for starting as a genesis miner", }, + &cli.BoolFlag{ + Name: "nosync", + Usage: "don't check full-node sync status", + }, }, Action: func(cctx *cli.Context) error { log.Info("Initializing lotus storage miner") @@ -91,7 +95,7 @@ var initCmd = &cli.Command{ log.Info("Checking full node sync status") - if !cctx.Bool("genesis-miner") { + if !cctx.Bool("genesis-miner") && !cctx.Bool("nosync") { if err := lcli.SyncWait(ctx, api); err != nil { return xerrors.Errorf("sync wait: %w", err) } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 0dbba9159..b96c20f6e 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -24,6 +24,8 @@ import ( "github.com/filecoin-project/lotus/node/repo" ) +const defaultListen = "/ip4/127.0.0.1/tcp/" + var runCmd = &cli.Command{ Name: "run", Usage: "Start a lotus storage miner process", @@ -32,18 +34,14 @@ var runCmd = &cli.Command{ Name: "api", Value: "2345", }, - &cli.StringFlag{ - Name: "apihost", - Value: "127.0.0.1", - }, &cli.BoolFlag{ Name: "enable-gpu-proving", - Usage: "Enable use of GPU for mining operations", + Usage: "enable use of GPU for mining operations", Value: true, }, &cli.BoolFlag{ Name: "nosync", - Usage: "Don't check full-node sync status", + Usage: "don't check full-node sync status", }, }, Action: func(cctx *cli.Context) error { @@ -99,15 +97,13 @@ var runCmd = &cli.Command{ node.Online(), node.Repo(r), - node.ApplyIf(func(s *node.Settings) bool { return cctx.IsSet("api") || cctx.IsSet("apihost") }, - node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { - apima, err := multiaddr.NewMultiaddr("/ip4/"+cctx.String("apihost")+"/tcp/" + - cctx.String("api")) - if err != nil { - return err - } - return lr.SetAPIEndpoint(apima) - })), + node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { + apima, err := parseApi(cctx.String("api")) + if err != nil { + return err + } + return lr.SetAPIEndpoint(apima) + }), node.Override(new(api.FullNode), nodeApi), ) @@ -170,3 +166,15 @@ var runCmd = &cli.Command{ return srv.Serve(manet.NetListener(lst)) }, } + +func parseApi(api string) (multiaddr.Multiaddr, error) { + if api == "" { + return nil, xerrors.New("empty --api") + } + + if api[0] != '/' { + api = defaultListen + api + } + + return multiaddr.NewMultiaddr(api) +} diff --git a/lib/sectorbuilder/remote.go b/lib/sectorbuilder/remote.go index 78477f364..5add710ff 100644 --- a/lib/sectorbuilder/remote.go +++ b/lib/sectorbuilder/remote.go @@ -35,6 +35,7 @@ type workerCall struct { func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, error) { sb.remoteLk.Lock() + defer sb.remoteLk.Unlock() taskCh := make(chan WorkerTask) r := &remote{ @@ -44,7 +45,6 @@ func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, erro sb.remoteCtr++ sb.remotes[sb.remoteCtr] = r - sb.remoteLk.Unlock() go sb.remoteWorker(ctx, r) diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index cc11df891..400bdb058 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -101,7 +101,7 @@ func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput { } type SealRes struct { - Err string + Err string GoErr error `json:"-"` Proof []byte @@ -201,15 +201,15 @@ func NewStandalone(cfg *Config) (*SectorBuilder, error) { } return &SectorBuilder{ - ds: nil, + ds: nil, - ssize: cfg.SectorSize, + ssize: cfg.SectorSize, - Miner: cfg.Miner, - stagedDir: cfg.StagedDir, - sealedDir: cfg.SealedDir, - cacheDir: cfg.CacheDir, - unsealedDir:cfg.UnsealedDir, + Miner: cfg.Miner, + stagedDir: cfg.StagedDir, + sealedDir: cfg.SealedDir, + cacheDir: cfg.CacheDir, + unsealedDir: cfg.UnsealedDir, sealLocal: true, taskCtr: 1, diff --git a/lib/sectorbuilder/simple.go b/lib/sectorbuilder/simple.go index a5360e102..f20395ddd 100644 --- a/lib/sectorbuilder/simple.go +++ b/lib/sectorbuilder/simple.go @@ -76,4 +76,4 @@ func GeneratePieceCommitment(piece io.Reader, pieceSize uint64) (commP [CommLen] func GenerateDataCommitment(ssize uint64, pieces []sectorbuilder.PublicPieceInfo) ([CommLen]byte, error) { return sectorbuilder.GenerateDataCommitment(ssize, pieces) -} \ No newline at end of file +} diff --git a/lib/systar/systar.go b/lib/systar/systar.go index 1fb42d108..c83999376 100644 --- a/lib/systar/systar.go +++ b/lib/systar/systar.go @@ -1,12 +1,22 @@ package systar import ( + "golang.org/x/xerrors" "io" + "os" "os/exec" "path/filepath" + + logging "github.com/ipfs/go-log" ) +var log = logging.Logger("systar") + func ExtractTar(body io.Reader, dest string) error { + if err := os.MkdirAll(dest, 0755); err != nil { + return xerrors.Errorf("creating dest directory: %w", err) + } + cmd := exec.Command("tar", "-xS", "-C", dest) cmd.Stdin = body return cmd.Run() @@ -27,23 +37,11 @@ func TarDirectory(file string) (io.ReadCloser, error) { return nil, err } - return &struct { - io.Reader - io.Closer - }{ - Reader: i, - Closer: closer(func() error { - e1 := i.Close() - if err := cmd.Wait(); err != nil { - return err - } + go func() { + if err := o.CloseWithError(cmd.Wait()); err != nil { + log.Error(err) + } + }() - return e1 - }), - }, nil -} - -type closer func() error -func (cl closer) Close() error { - return cl() + return i, nil } diff --git a/lotuspond/front/src/StorageNode.js b/lotuspond/front/src/StorageNode.js index 903729d9f..7851fbb4a 100644 --- a/lotuspond/front/src/StorageNode.js +++ b/lotuspond/front/src/StorageNode.js @@ -16,6 +16,11 @@ let sealCodes = [ "PreCommitted", "Committing", "Proving", + + "SealFailed", + "PreCommitFailed", + "SealCommitFailed", + "CommitFailed", ] class StorageNode extends React.Component { diff --git a/lotuspond/spawn.go b/lotuspond/spawn.go index 5eb8ca52d..bcf2dd001 100644 --- a/lotuspond/spawn.go +++ b/lotuspond/spawn.go @@ -136,7 +136,7 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) { return nodeInfo{}, err } - initArgs := []string{"init"} + initArgs := []string{"init", "--nosync"} if fullNodeRepo == api.running[1].meta.Repo { initArgs = []string{"init", "--actor=t0101", "--genesis-miner", "--pre-sealed-sectors=" + filepath.Join(fullNodeRepo, "preseal")} } @@ -154,7 +154,7 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) { mux := newWsMux() - cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id)) + cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id), "--nosync") cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw) cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw) cmd.Env = append(os.Environ(), "LOTUS_STORAGE_PATH="+dir, "LOTUS_PATH="+fullNodeRepo) @@ -214,7 +214,7 @@ func (api *api) RestartNode(id int32) (nodeInfo, error) { var cmd *exec.Cmd if nd.meta.Storage { - cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id)) + cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id), "--nosync") } else { cmd = exec.Command("./lotus", "daemon", "--api", fmt.Sprintf("%d", 2500+id)) } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 5e413c805..25cdf16b9 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -16,6 +16,7 @@ import ( "mime" "net/http" "os" + "path/filepath" ) type StorageMinerAPI struct { @@ -110,7 +111,9 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques switch mediatype { case "application/x-tar": - if err := systar.ExtractTar(r.Body, path); err != nil { + if err := systar.ExtractTar(r.Body, filepath.Dir(path)); err != nil { + log.Error(err) + w.WriteHeader(500) return } default: @@ -203,8 +206,6 @@ func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilde } func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { - log.Infof("WDUN RSPKO %v", res.Rspco) - return sm.SectorBuilder.TaskDone(ctx, task, res) }