From 83924e6b97574db055dde1a9707c9cfc470c3ea9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 6 Dec 2019 01:27:32 +0100 Subject: [PATCH 1/2] sectorbuilder: Allow to restrict task types License: MIT Signed-off-by: Jakub Sztandera --- api/api_storage.go | 2 +- api/struct.go | 8 +- chain/validation/state.go | 14 +-- cmd/lotus-chainwatch/dot.go | 4 +- cmd/lotus-seal-worker/main.go | 11 ++- cmd/lotus-seal-worker/sub.go | 20 ++++- cmd/lotus-seal-worker/transfer.go | 6 +- cmd/lotus-storage-miner/info.go | 6 ++ cmd/lotus-storage-miner/init.go | 4 +- cmd/lotus-storage-miner/sectors.go | 5 ++ extern/filecoin-ffi | 2 +- lib/sectorbuilder/remote.go | 110 +++++++++++++++--------- lib/sectorbuilder/sectorbuilder.go | 92 ++++++++++++++------ lib/sectorbuilder/sectorbuilder_test.go | 5 ++ lib/systar/systar.go | 47 ---------- lib/tarutil/systar.go | 88 +++++++++++++++++++ node/builder.go | 5 +- node/config/def.go | 3 + node/impl/storminer.go | 16 ++-- node/modules/storageminer.go | 9 +- storage/cbor_gen.go | 28 +++--- storage/sector_states.go | 2 - storage/sector_types.go | 14 ++- storage/sectors.go | 6 +- 24 files changed, 326 insertions(+), 181 deletions(-) delete mode 100644 lib/systar/systar.go create mode 100644 lib/tarutil/systar.go diff --git a/api/api_storage.go b/api/api_storage.go index 3e69a726d..5c41473f4 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -73,7 +73,7 @@ type StorageMiner interface { WorkerStats(context.Context) (sectorbuilder.WorkerStats, error) // WorkerQueue registers a remote worker - WorkerQueue(context.Context) (<-chan sectorbuilder.WorkerTask, error) + WorkerQueue(context.Context, sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error } diff --git a/api/struct.go b/api/struct.go index 8a52c105b..12b03f4ed 100644 --- a/api/struct.go +++ b/api/struct.go @@ -143,8 +143,8 @@ type StorageMinerStruct struct { WorkerStats func(context.Context) (sectorbuilder.WorkerStats, error) `perm:"read"` - WorkerQueue func(context.Context) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm - WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"` + WorkerQueue func(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) `perm:"admin"` // TODO: worker perm + WorkerDone func(ctx context.Context, task uint64, res sectorbuilder.SealRes) error `perm:"admin"` } } @@ -522,8 +522,8 @@ func (c *StorageMinerStruct) WorkerStats(ctx context.Context) (sectorbuilder.Wor return c.Internal.WorkerStats(ctx) } -func (c *StorageMinerStruct) WorkerQueue(ctx context.Context) (<-chan sectorbuilder.WorkerTask, error) { - return c.Internal.WorkerQueue(ctx) +func (c *StorageMinerStruct) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) { + return c.Internal.WorkerQueue(ctx, cfg) } func (c *StorageMinerStruct) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { diff --git a/chain/validation/state.go b/chain/validation/state.go index f8671f3fd..a3acfcd68 100644 --- a/chain/validation/state.go +++ b/chain/validation/state.go @@ -8,10 +8,10 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-hamt-ipld" - "github.com/minio/blake2b-simd" blockstore "github.com/ipfs/go-ipfs-blockstore" - "golang.org/x/xerrors" + "github.com/minio/blake2b-simd" cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" vstate "github.com/filecoin-project/chain-validation/pkg/state" vactors "github.com/filecoin-project/chain-validation/pkg/state/actors" @@ -327,7 +327,7 @@ func fromActorCode(code vactors.ActorCodeID) cid.Cid { } } -func fromSingletonAddress(addr vactors.SingletonActorID) vaddress.Address{ +func fromSingletonAddress(addr vactors.SingletonActorID) vaddress.Address { switch addr { case vactors.InitAddress: out, err := vaddress.NewFromBytes(actors.InitAddress.Bytes()) @@ -336,25 +336,25 @@ func fromSingletonAddress(addr vactors.SingletonActorID) vaddress.Address{ } return out case vactors.NetworkAddress: - out, err := vaddress.NewFromBytes(actors.NetworkAddress.Bytes()) + out, err := vaddress.NewFromBytes(actors.NetworkAddress.Bytes()) if err != nil { panic(err) } return out case vactors.StorageMarketAddress: - out, err := vaddress.NewFromBytes(actors.StorageMarketAddress.Bytes()) + out, err := vaddress.NewFromBytes(actors.StorageMarketAddress.Bytes()) if err != nil { panic(err) } return out case vactors.BurntFundsAddress: - out, err := vaddress.NewFromBytes(actors.BurntFundsAddress.Bytes()) + out, err := vaddress.NewFromBytes(actors.BurntFundsAddress.Bytes()) if err != nil { panic(err) } return out case vactors.StoragePowerAddress: - out, err := vaddress.NewFromBytes(actors.StoragePowerAddress.Bytes()) + out, err := vaddress.NewFromBytes(actors.StoragePowerAddress.Bytes()) if err != nil { panic(err) } diff --git a/cmd/lotus-chainwatch/dot.go b/cmd/lotus-chainwatch/dot.go index 2a899c100..dc93c0c58 100644 --- a/cmd/lotus-chainwatch/dot.go +++ b/cmd/lotus-chainwatch/dot.go @@ -10,8 +10,8 @@ import ( ) var dotCmd = &cli.Command{ - Name: "dot", - Usage: "generate dot graphs", + Name: "dot", + Usage: "generate dot graphs", ArgsUsage: " ", Action: func(cctx *cli.Context) error { st, err := openStorage(cctx.String("db")) diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index e4a74ec7e..c14644cb4 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -1,9 +1,10 @@ package main import ( - "github.com/mitchellh/go-homedir" "os" + "github.com/mitchellh/go-homedir" + logging "github.com/ipfs/go-log" "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" @@ -44,6 +45,12 @@ func main() { Usage: "enable use of GPU for mining operations", Value: true, }, + &cli.BoolFlag{ + Name: "no-precommit", + }, + &cli.BoolFlag{ + Name: "no-commit", + }, }, Commands: local, @@ -95,6 +102,6 @@ var runCmd = &cli.Command{ log.Warn("Shutting down..") }() - return acceptJobs(ctx, nodeApi, "http://"+storageAddr, auth, r) + return acceptJobs(ctx, nodeApi, "http://"+storageAddr, auth, r, cctx.Bool("no-precommit"), cctx.Bool("no-commit")) }, } diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index b3cb828e0..9c5950b28 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -7,12 +7,13 @@ import ( "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/api" + lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/lib/sectorbuilder" ) type worker struct { - api api.StorageMiner + api lapi.StorageMiner minerEndpoint string repo string auth http.Header @@ -20,7 +21,7 @@ type worker struct { sb *sectorbuilder.SectorBuilder } -func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth http.Header, repo string) error { +func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, auth http.Header, repo string, noprecommit, nocommit bool) error { act, err := api.ActorAddress(ctx) if err != nil { return err @@ -43,6 +44,10 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth return err } + if err := build.GetParams(ssize); err != nil { + return xerrors.Errorf("get params: %w", err) + } + w := &worker{ api: api, minerEndpoint: endpoint, @@ -51,13 +56,18 @@ func acceptJobs(ctx context.Context, api api.StorageMiner, endpoint string, auth sb: sb, } - tasks, err := api.WorkerQueue(ctx) + tasks, err := api.WorkerQueue(ctx, sectorbuilder.WorkerCfg{ + NoPreCommit: noprecommit, + NoCommit: nocommit, + }) if err != nil { return err } loop: for { + log.Infof("Waiting for new task") + select { case task := <-tasks: log.Infof("New task: %d, sector %d, action: %d", task.TaskID, task.SectorID, task.Type) @@ -90,6 +100,8 @@ func (w *worker) processTask(ctx context.Context, task sectorbuilder.WorkerTask) return errRes(xerrors.Errorf("fetching sector: %w", err)) } + log.Infof("Data fetched, starting computation") + var res sectorbuilder.SealRes switch task.Type { diff --git a/cmd/lotus-seal-worker/transfer.go b/cmd/lotus-seal-worker/transfer.go index 68727c056..e091edadf 100644 --- a/cmd/lotus-seal-worker/transfer.go +++ b/cmd/lotus-seal-worker/transfer.go @@ -12,7 +12,7 @@ import ( "path/filepath" "github.com/filecoin-project/lotus/lib/sectorbuilder" - "github.com/filecoin-project/lotus/lib/systar" + "github.com/filecoin-project/lotus/lib/tarutil" ) func (w *worker) fetch(typ string, sectorID uint64) error { @@ -58,7 +58,7 @@ func (w *worker) fetch(typ string, sectorID uint64) error { switch mediatype { case "application/x-tar": - return systar.ExtractTar(barreader, filepath.Dir(outname)) + return tarutil.ExtractTar(barreader, outname) case "application/octet-stream": return files.WriteTo(files.NewReaderFile(barreader), outname) default: @@ -80,7 +80,7 @@ func (w *worker) push(typ string, sectorID uint64) error { var r io.Reader if stat.IsDir() { - r, err = systar.TarDirectory(filename) + r, err = tarutil.TarDirectory(filename) } else { r, err = os.OpenFile(filename, os.O_RDONLY, 0644) } diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 393b04321..154d38fa0 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -63,6 +63,12 @@ var infoCmd = &cli.Command{ fmt.Printf("\tLocal: %d / %d (+%d reserved)\n", wstat.LocalTotal-wstat.LocalReserved-wstat.LocalFree, wstat.LocalTotal-wstat.LocalReserved, wstat.LocalReserved) fmt.Printf("\tRemote: %d / %d\n", wstat.RemotesTotal-wstat.RemotesFree, wstat.RemotesTotal) + fmt.Printf("Queues:\n") + fmt.Printf("\tAddPiece: %d\n", wstat.AddPieceWait) + fmt.Printf("\tPreCommit: %d\n", wstat.PreCommitWait) + fmt.Printf("\tCommit: %d\n", wstat.CommitWait) + fmt.Printf("\tUnseal: %d\n", wstat.UnsealWait) + eps, err := api.StateMinerElectionPeriodStart(ctx, maddr, nil) if err != nil { return err diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index c51d07ba1..c396f2def 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -249,10 +249,8 @@ func migratePreSealMeta(ctx context.Context, api lapi.FullNode, presealDir strin CommP: sector.CommD[:], }, }, - CommC: nil, CommD: sector.CommD[:], CommR: sector.CommR[:], - CommRLast: nil, Proof: nil, Ticket: storage.SealTicket{}, PreCommitMessage: nil, @@ -358,7 +356,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, return err } - sbcfg, err := modules.SectorBuilderConfig(lr.Path(), 2)(mds, api) + sbcfg, err := modules.SectorBuilderConfig(lr.Path(), 2, false, false)(mds, api) if err != nil { return xerrors.Errorf("getting genesis miner sector builder config: %w", err) } diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index 31a7ceaa7..1c4b67083 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "sort" "strconv" "golang.org/x/xerrors" @@ -125,6 +126,10 @@ var sectorsListCmd = &cli.Command{ commitedIDs[info.SectorID] = struct{}{} } + sort.Slice(list, func(i, j int) bool { + return list[i] < list[j] + }) + for _, s := range list { st, err := nodeApi.SectorsStatus(ctx, s) if err != nil { diff --git a/extern/filecoin-ffi b/extern/filecoin-ffi index 6d9e80001..6ac840062 160000 --- a/extern/filecoin-ffi +++ b/extern/filecoin-ffi @@ -1 +1 @@ -Subproject commit 6d9e80001bfa2d80eec4e157da46d783038d9b42 +Subproject commit 6ac840062c094b35c87a87638cd7a262f43edd49 diff --git a/lib/sectorbuilder/remote.go b/lib/sectorbuilder/remote.go index 5add710ff..9fa4190ff 100644 --- a/lib/sectorbuilder/remote.go +++ b/lib/sectorbuilder/remote.go @@ -33,7 +33,7 @@ type workerCall struct { ret chan SealRes } -func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, error) { +func (sb *SectorBuilder) AddWorker(ctx context.Context, cfg WorkerCfg) (<-chan WorkerTask, error) { sb.remoteLk.Lock() defer sb.remoteLk.Unlock() @@ -46,22 +46,32 @@ func (sb *SectorBuilder) AddWorker(ctx context.Context) (<-chan WorkerTask, erro sb.remoteCtr++ sb.remotes[sb.remoteCtr] = r - go sb.remoteWorker(ctx, r) + go sb.remoteWorker(ctx, r, cfg) return taskCh, nil } func (sb *SectorBuilder) returnTask(task workerCall) { + var ret chan workerCall + switch task.task.Type { + case WorkerPreCommit: + ret = sb.precommitTasks + case WorkerCommit: + ret = sb.commitTasks + default: + log.Error("unknown task type", task.task.Type) + } + go func() { select { - case sb.sealTasks <- task: + case ret <- task: case <-sb.stopping: return } }() } -func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) { +func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote, cfg WorkerCfg) { defer log.Warn("Remote worker disconnected") defer func() { @@ -76,47 +86,21 @@ func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) { } }() + precommits := sb.precommitTasks + if cfg.NoPreCommit { + precommits = nil + } + commits := sb.commitTasks + if cfg.NoCommit { + commits = nil + } + for { select { - case task := <-sb.sealTasks: - resCh := make(chan SealRes) - - sb.remoteLk.Lock() - sb.remoteResults[task.task.TaskID] = resCh - sb.remoteLk.Unlock() - - // send the task - select { - case r.sealTasks <- task.task: - case <-ctx.Done(): - sb.returnTask(task) - return - } - - r.lk.Lock() - r.busy = task.task.TaskID - r.lk.Unlock() - - // wait for the result - select { - case res := <-resCh: - - // send the result back to the caller - select { - case task.ret <- res: - case <-ctx.Done(): - return - case <-sb.stopping: - return - } - - case <-ctx.Done(): - log.Warnf("context expired while waiting for task %d (sector %d): %s", task.task.TaskID, task.task.SectorID, ctx.Err()) - return - case <-sb.stopping: - return - } - + case task := <-commits: + sb.doTask(ctx, r, task) + case task := <-precommits: + sb.doTask(ctx, r, task) case <-ctx.Done(): return case <-sb.stopping: @@ -129,6 +113,46 @@ func (sb *SectorBuilder) remoteWorker(ctx context.Context, r *remote) { } } +func (sb *SectorBuilder) doTask(ctx context.Context, r *remote, task workerCall) { + resCh := make(chan SealRes) + + sb.remoteLk.Lock() + sb.remoteResults[task.task.TaskID] = resCh + sb.remoteLk.Unlock() + + // send the task + select { + case r.sealTasks <- task.task: + case <-ctx.Done(): + sb.returnTask(task) + return + } + + r.lk.Lock() + r.busy = task.task.TaskID + r.lk.Unlock() + + // wait for the result + select { + case res := <-resCh: + + // send the result back to the caller + select { + case task.ret <- res: + case <-ctx.Done(): + return + case <-sb.stopping: + return + } + + case <-ctx.Done(): + log.Warnf("context expired while waiting for task %d (sector %d): %s", task.task.TaskID, task.task.SectorID, ctx.Err()) + return + case <-sb.stopping: + return + } +} + func (sb *SectorBuilder) TaskDone(ctx context.Context, task uint64, res SealRes) error { sb.remoteLk.Lock() rres, ok := sb.remoteResults[task] diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 01044bd7d..f7c61ebf8 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -45,6 +45,13 @@ type EPostCandidate = sectorbuilder.Candidate const CommLen = sectorbuilder.CommitmentBytesLen +type WorkerCfg struct { + NoPreCommit bool + NoCommit bool + + // TODO: 'cost' info, probably in terms of sealing + transfer speed +} + type SectorBuilder struct { ds dtypes.MetadataDS idLk sync.Mutex @@ -61,10 +68,12 @@ type SectorBuilder struct { unsealLk sync.Mutex - sealLocal bool - rateLimit chan struct{} + noCommit bool + noPreCommit bool + rateLimit chan struct{} - sealTasks chan workerCall + precommitTasks chan workerCall + commitTasks chan workerCall taskCtr uint64 remoteLk sync.Mutex @@ -72,31 +81,30 @@ type SectorBuilder struct { remotes map[int]*remote remoteResults map[uint64]chan<- SealRes + addPieceWait int32 + preCommitWait int32 + commitWait int32 + unsealWait int32 + stopping chan struct{} } type JsonRSPCO struct { - CommC []byte - CommD []byte - CommR []byte - CommRLast []byte + CommD []byte + CommR []byte } func (rspco *RawSealPreCommitOutput) ToJson() JsonRSPCO { return JsonRSPCO{ - CommC: rspco.CommC[:], - CommD: rspco.CommD[:], - CommR: rspco.CommR[:], - CommRLast: rspco.CommRLast[:], + CommD: rspco.CommD[:], + CommR: rspco.CommR[:], } } func (rspco *JsonRSPCO) rspco() RawSealPreCommitOutput { var out RawSealPreCommitOutput - copy(out.CommC[:], rspco.CommC) copy(out.CommD[:], rspco.CommD) copy(out.CommR[:], rspco.CommR) - copy(out.CommRLast[:], rspco.CommRLast) return out } @@ -121,6 +129,8 @@ type Config struct { WorkerThreads uint8 FallbackLastID uint64 + NoCommit bool + NoPreCommit bool CacheDir string SealedDir string @@ -179,13 +189,15 @@ func New(cfg *Config, ds dtypes.MetadataDS) (*SectorBuilder, error) { Miner: cfg.Miner, - sealLocal: sealLocal, - rateLimit: make(chan struct{}, rlimit), + noPreCommit: cfg.NoPreCommit || !sealLocal, + noCommit: cfg.NoCommit || !sealLocal, + rateLimit: make(chan struct{}, rlimit), - taskCtr: 1, - sealTasks: make(chan workerCall), - remoteResults: map[uint64]chan<- SealRes{}, - remotes: map[int]*remote{}, + taskCtr: 1, + precommitTasks: make(chan workerCall), + commitTasks: make(chan workerCall), + remoteResults: map[uint64]chan<- SealRes{}, + remotes: map[int]*remote{}, stopping: make(chan struct{}), } @@ -214,7 +226,6 @@ func NewStandalone(cfg *Config) (*SectorBuilder, error) { cacheDir: cfg.CacheDir, unsealedDir: cfg.UnsealedDir, - sealLocal: true, taskCtr: 1, remotes: map[int]*remote{}, rateLimit: make(chan struct{}, cfg.WorkerThreads), @@ -245,6 +256,11 @@ type WorkerStats struct { // todo: post in progress RemotesTotal int RemotesFree int + + AddPieceWait int + PreCommitWait int + CommitWait int + UnsealWait int } func (sb *SectorBuilder) WorkerStats() WorkerStats { @@ -264,6 +280,11 @@ func (sb *SectorBuilder) WorkerStats() WorkerStats { LocalTotal: cap(sb.rateLimit) + PoStReservedWorkers, RemotesTotal: len(sb.remotes), RemotesFree: remoteFree, + + AddPieceWait: int(atomic.LoadInt32(&sb.addPieceWait)), + PreCommitWait: int(atomic.LoadInt32(&sb.preCommitWait)), + CommitWait: int(atomic.LoadInt32(&sb.commitWait)), + UnsealWait: int(atomic.LoadInt32(&sb.unsealWait)), } } @@ -288,7 +309,9 @@ func (sb *SectorBuilder) AcquireSectorId() (uint64, error) { } func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) { + atomic.AddInt32(&sb.addPieceWait, 1) ret := sb.RateLimit() + atomic.AddInt32(&sb.addPieceWait, -1) defer ret() f, werr, err := toReadableFile(file, int64(pieceSize)) @@ -321,8 +344,11 @@ func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Rea } func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) { + atomic.AddInt32(&sb.unsealWait, 1) + // TODO: Don't wait if cached ret := sb.RateLimit() // TODO: check perf, consider remote unseal worker defer ret() + atomic.AddInt32(&sb.unsealWait, -1) sb.unsealLk.Lock() // TODO: allow unsealing unrelated sectors in parallel defer sb.unsealLk.Unlock() @@ -389,6 +415,8 @@ func (sb *SectorBuilder) ReadPieceFromSealedSector(sectorID uint64, offset uint6 } func (sb *SectorBuilder) sealPreCommitRemote(call workerCall) (RawSealPreCommitOutput, error) { + atomic.AddInt32(&sb.preCommitWait, -1) + select { case ret := <-call.ret: var err error @@ -413,8 +441,10 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece ret: make(chan SealRes), } + atomic.AddInt32(&sb.preCommitWait, 1) + select { // prefer remote - case sb.sealTasks <- call: + case sb.precommitTasks <- call: return sb.sealPreCommitRemote(call) default: } @@ -422,16 +452,18 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece sb.checkRateLimit() rl := sb.rateLimit - if !sb.sealLocal { + if sb.noPreCommit { rl = make(chan struct{}) } select { // use whichever is available - case sb.sealTasks <- call: + case sb.precommitTasks <- call: return sb.sealPreCommitRemote(call) case rl <- struct{}{}: } + atomic.AddInt32(&sb.preCommitWait, -1) + // local defer func() { @@ -474,10 +506,14 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece return RawSealPreCommitOutput{}, xerrors.Errorf("presealing sector %d (%s): %w", sectorID, stagedPath, err) } + log.Infof("PRECOMMIT FFI RSPCO %v", rspco) + return RawSealPreCommitOutput(rspco), nil } func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err error) { + atomic.AddInt32(&sb.commitWait, -1) + select { case ret := <-call.ret: if ret.Err != "" { @@ -490,6 +526,8 @@ func (sb *SectorBuilder) sealCommitRemote(call workerCall) (proof []byte, err er } func (sb *SectorBuilder) sealCommitLocal(sectorID uint64, ticket SealTicket, seed SealSeed, pieces []PublicPieceInfo, rspco RawSealPreCommitOutput) (proof []byte, err error) { + atomic.AddInt32(&sb.commitWait, -1) + defer func() { <-sb.rateLimit }() @@ -535,19 +573,21 @@ func (sb *SectorBuilder) SealCommit(sectorID uint64, ticket SealTicket, seed Sea ret: make(chan SealRes), } + atomic.AddInt32(&sb.commitWait, 1) + select { // prefer remote - case sb.sealTasks <- call: + case sb.commitTasks <- call: proof, err = sb.sealCommitRemote(call) default: sb.checkRateLimit() rl := sb.rateLimit - if !sb.sealLocal { + if sb.noCommit { rl = make(chan struct{}) } select { // use whichever is available - case sb.sealTasks <- call: + case sb.commitTasks <- call: proof, err = sb.sealCommitRemote(call) case rl <- struct{}{}: proof, err = sb.sealCommitLocal(sectorID, ticket, seed, pieces, rspco) diff --git a/lib/sectorbuilder/sectorbuilder_test.go b/lib/sectorbuilder/sectorbuilder_test.go index 36520374e..13c80f0fc 100644 --- a/lib/sectorbuilder/sectorbuilder_test.go +++ b/lib/sectorbuilder/sectorbuilder_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/ipfs/go-datastore" + logging "github.com/ipfs/go-log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -21,6 +22,10 @@ import ( "github.com/filecoin-project/lotus/lib/sectorbuilder" ) +func init() { + logging.SetLogLevel("*", "INFO") +} + const sectorSize = 1024 type seal struct { diff --git a/lib/systar/systar.go b/lib/systar/systar.go deleted file mode 100644 index c83999376..000000000 --- a/lib/systar/systar.go +++ /dev/null @@ -1,47 +0,0 @@ -package systar - -import ( - "golang.org/x/xerrors" - "io" - "os" - "os/exec" - "path/filepath" - - logging "github.com/ipfs/go-log" -) - -var log = logging.Logger("systar") - -func ExtractTar(body io.Reader, dest string) error { - if err := os.MkdirAll(dest, 0755); err != nil { - return xerrors.Errorf("creating dest directory: %w", err) - } - - cmd := exec.Command("tar", "-xS", "-C", dest) - cmd.Stdin = body - return cmd.Run() -} - -func TarDirectory(file string) (io.ReadCloser, error) { - // use system builtin tar, golang one doesn't support sparse files - - dir := filepath.Dir(file) - base := filepath.Base(file) - - i, o := io.Pipe() - - // don't bother with compression, it's mostly random data - cmd := exec.Command("tar", "-cSf", "-", "-C", dir, base) - cmd.Stdout = o - if err := cmd.Start(); err != nil { - return nil, err - } - - go func() { - if err := o.CloseWithError(cmd.Wait()); err != nil { - log.Error(err) - } - }() - - return i, nil -} diff --git a/lib/tarutil/systar.go b/lib/tarutil/systar.go new file mode 100644 index 000000000..d5214108e --- /dev/null +++ b/lib/tarutil/systar.go @@ -0,0 +1,88 @@ +package tarutil + +import ( + "archive/tar" + "golang.org/x/xerrors" + "io" + "io/ioutil" + "os" + "path/filepath" + + logging "github.com/ipfs/go-log" +) + +var log = logging.Logger("tarutil") + +func ExtractTar(body io.Reader, dir string) error { + if err := os.MkdirAll(dir, 0755); err != nil { + return xerrors.Errorf("mkdir: %w", err) + } + + tr := tar.NewReader(body) + for { + header, err := tr.Next() + switch err { + default: + return err + case io.EOF: + return nil + + case nil: + } + + f, err := os.Create(filepath.Join(dir, header.Name)) + if err != nil { + return xerrors.Errorf("creating file %s: %w", filepath.Join(dir, header.Name), err) + } + + if _, err := io.Copy(f, tr); err != nil { + return err + } + } +} + +func TarDirectory(dir string) (io.ReadCloser, error) { + r, w := io.Pipe() + + go func() { + _ = w.CloseWithError(writeTarDirectory(dir, w)) + }() + + return r, nil +} + +func writeTarDirectory(dir string, w io.Writer) error { + tw := tar.NewWriter(w) + + files, err := ioutil.ReadDir(dir) + if err != nil { + return err + } + + for _, file := range files { + h, err := tar.FileInfoHeader(file, "") + if err != nil { + return xerrors.Errorf("getting header for file %s: %w", file.Name(), err) + } + + if err := tw.WriteHeader(h); err != nil { + return xerrors.Errorf("wiritng header for file %s: %w", file.Name(), err) + } + + f, err := os.OpenFile(filepath.Join(dir, file.Name()), os.O_RDONLY, 644) + if err != nil { + return xerrors.Errorf("opening %s for reading: %w", file.Name(), err) + } + + if _, err := io.Copy(tw, f); err != nil { + return xerrors.Errorf("copy data for file %s: %w", file.Name(), err) + } + + if err := f.Close(); err != nil { + return err + } + + } + + return nil +} diff --git a/node/builder.go b/node/builder.go index eedacadaf..4ab041eaf 100644 --- a/node/builder.go +++ b/node/builder.go @@ -334,7 +334,10 @@ func ConfigStorageMiner(c interface{}, lr repo.LockedRepo) Option { return Options( ConfigCommon(&cfg.Common), - Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(path, cfg.SectorBuilder.WorkerCount)), + Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(path, + cfg.SectorBuilder.WorkerCount, + cfg.SectorBuilder.DisableLocalPreCommit, + cfg.SectorBuilder.DisableLocalCommit)), ) } diff --git a/node/config/def.go b/node/config/def.go index 9391708a6..80b6f199f 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -50,6 +50,9 @@ type Metrics struct { type SectorBuilder struct { Path string WorkerCount uint + + DisableLocalPreCommit bool + DisableLocalCommit bool } func defCommon() Common { diff --git a/node/impl/storminer.go b/node/impl/storminer.go index d79156c0e..1934bec80 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -7,17 +7,17 @@ import ( "mime" "net/http" "os" - "path/filepath" + + "github.com/gorilla/mux" + files "github.com/ipfs/go-ipfs-files" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/lib/sectorbuilder" - "github.com/filecoin-project/lotus/lib/systar" + "github.com/filecoin-project/lotus/lib/tarutil" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/storage" "github.com/filecoin-project/lotus/storage/sectorblocks" - "github.com/gorilla/mux" - files "github.com/ipfs/go-ipfs-files" ) type StorageMinerAPI struct { @@ -68,7 +68,7 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques var rd io.Reader if stat.IsDir() { - rd, err = systar.TarDirectory(path) + rd, err = tarutil.TarDirectory(path) w.Header().Set("Content-Type", "application/x-tar") } else { rd, err = os.OpenFile(path, os.O_RDONLY, 0644) @@ -112,7 +112,7 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques switch mediatype { case "application/x-tar": - if err := systar.ExtractTar(r.Body, filepath.Dir(path)); err != nil { + if err := tarutil.ExtractTar(r.Body, path); err != nil { log.Error(err) w.WriteHeader(500) return @@ -206,8 +206,8 @@ func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id uint64, state a return sm.Miner.UpdateSectorState(ctx, id, state) } -func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context) (<-chan sectorbuilder.WorkerTask, error) { - return sm.SectorBuilder.AddWorker(ctx) +func (sm *StorageMinerAPI) WorkerQueue(ctx context.Context, cfg sectorbuilder.WorkerCfg) (<-chan sectorbuilder.WorkerTask, error) { + return sm.SectorBuilder.AddWorker(ctx, cfg) } func (sm *StorageMinerAPI) WorkerDone(ctx context.Context, task uint64, res sectorbuilder.SealRes) error { diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index a6a2300e1..4045c1cc2 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -52,7 +52,7 @@ func GetParams(sbc *sectorbuilder.Config) error { return nil } -func SectorBuilderConfig(storagePath string, threads uint) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) { +func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit bool) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) { return func(ds dtypes.MetadataDS, api api.FullNode) (*sectorbuilder.Config, error) { minerAddr, err := minerAddrFromDS(ds) if err != nil { @@ -79,9 +79,12 @@ func SectorBuilderConfig(storagePath string, threads uint) func(dtypes.MetadataD staging := filepath.Join(sp, "staging") sb := §orbuilder.Config{ - Miner: minerAddr, - SectorSize: ssize, + Miner: minerAddr, + SectorSize: ssize, + WorkerThreads: uint8(threads), + NoPreCommit: noprecommit, + NoCommit: nocommit, CacheDir: cache, UnsealedDir: unsealed, diff --git a/storage/cbor_gen.go b/storage/cbor_gen.go index 41de25822..aa1eabd47 100644 --- a/storage/cbor_gen.go +++ b/storage/cbor_gen.go @@ -263,11 +263,11 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } - // t.t.CommC ([]uint8) (slice) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommC)))); err != nil { + // t.t.Pad0 ([]uint8) (slice) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Pad0)))); err != nil { return err } - if _, err := w.Write(t.CommC); err != nil { + if _, err := w.Write(t.Pad0); err != nil { return err } @@ -287,11 +287,11 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { return err } - // t.t.CommRLast ([]uint8) (slice) - if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.CommRLast)))); err != nil { + // t.t.Pad1 ([]uint8) (slice) + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.Pad1)))); err != nil { return err } - if _, err := w.Write(t.CommRLast); err != nil { + if _, err := w.Write(t.Pad1); err != nil { return err } @@ -408,21 +408,21 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.Pieces[i] = v } - // t.t.CommC ([]uint8) (slice) + // t.t.Pad0 ([]uint8) (slice) maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err } if extra > 8192 { - return fmt.Errorf("t.CommC: array too large (%d)", extra) + return fmt.Errorf("t.Pad0: array too large (%d)", extra) } if maj != cbg.MajByteString { return fmt.Errorf("expected byte array") } - t.CommC = make([]byte, extra) - if _, err := io.ReadFull(br, t.CommC); err != nil { + t.Pad0 = make([]byte, extra) + if _, err := io.ReadFull(br, t.Pad0); err != nil { return err } // t.t.CommD ([]uint8) (slice) @@ -459,21 +459,21 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { if _, err := io.ReadFull(br, t.CommR); err != nil { return err } - // t.t.CommRLast ([]uint8) (slice) + // t.t.Pad1 ([]uint8) (slice) maj, extra, err = cbg.CborReadHeader(br) if err != nil { return err } if extra > 8192 { - return fmt.Errorf("t.CommRLast: array too large (%d)", extra) + return fmt.Errorf("t.Pad1: array too large (%d)", extra) } if maj != cbg.MajByteString { return fmt.Errorf("expected byte array") } - t.CommRLast = make([]byte, extra) - if _, err := io.ReadFull(br, t.CommRLast); err != nil { + t.Pad1 = make([]byte, extra) + if _, err := io.ReadFull(br, t.Pad1); err != nil { return err } // t.t.Proof ([]uint8) (slice) diff --git a/storage/sector_states.go b/storage/sector_states.go index 8369a5722..5d2a616a8 100644 --- a/storage/sector_states.go +++ b/storage/sector_states.go @@ -75,10 +75,8 @@ func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUp } return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) { - info.CommC = rspco.CommC[:] info.CommD = rspco.CommD[:] info.CommR = rspco.CommR[:] - info.CommRLast = rspco.CommRLast[:] info.Ticket = SealTicket{ BlockHeight: ticket.BlockHeight, TicketBytes: ticket.TicketBytes[:], diff --git a/storage/sector_types.go b/storage/sector_types.go index 20dff0161..7a0a975c1 100644 --- a/storage/sector_types.go +++ b/storage/sector_types.go @@ -55,12 +55,12 @@ type SectorInfo struct { Pieces []Piece // PreCommit - CommC []byte - CommD []byte - CommR []byte - CommRLast []byte - Proof []byte - Ticket SealTicket + Pad0 []byte // TODO: legacy placeholder, remove + CommD []byte + CommR []byte + Pad1 []byte // TODO: legacy placeholder, remove + Proof []byte + Ticket SealTicket PreCommitMessage *cid.Cid @@ -105,10 +105,8 @@ func (t *SectorInfo) existingPieces() []uint64 { func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput { var out sectorbuilder.RawSealPreCommitOutput - copy(out.CommC[:], t.CommC) copy(out.CommD[:], t.CommD) copy(out.CommR[:], t.CommR) - copy(out.CommRLast[:], t.CommRLast) return out } diff --git a/storage/sectors.go b/storage/sectors.go index 72f380a81..62f1db1d8 100644 --- a/storage/sectors.go +++ b/storage/sectors.go @@ -167,9 +167,11 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) { var sector SectorInfo err := m.sectors.Mutate(update.id, func(s *SectorInfo) error { s.State = update.newState - s.LastErr = "" if update.err != nil { - s.LastErr = fmt.Sprintf("%+v", update.err) + if s.LastErr != "" { + s.LastErr += "---------\n\n" + } + s.LastErr += fmt.Sprintf("entering state %s: %+v", api.SectorStates[update.newState], update.err) } if update.mut != nil { From ac8d8fb67154cea4799f9478242d87b552e08023 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Sat, 7 Dec 2019 19:08:43 +0100 Subject: [PATCH 2/2] Update to correct version License: MIT Signed-off-by: Jakub Sztandera --- extern/filecoin-ffi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extern/filecoin-ffi b/extern/filecoin-ffi index 6ac840062..fe188cfa1 160000 --- a/extern/filecoin-ffi +++ b/extern/filecoin-ffi @@ -1 +1 @@ -Subproject commit 6ac840062c094b35c87a87638cd7a262f43edd49 +Subproject commit fe188cfa1e082e4c41cadeb17a3a7c9e43ae6f03