From 5af64c53b65638a2614b144aa8e719d9be66af31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 29 Jan 2020 00:08:02 +0100 Subject: [PATCH 01/17] initial sectorbuilder FS refactor integration --- cmd/lotus-bench/main.go | 6 ++--- cmd/lotus-chainwatch/sync.go | 1 - cmd/lotus-seal-worker/sub.go | 2 +- cmd/lotus-seed/main.go | 4 +-- cmd/lotus-seed/seed/seed.go | 6 ++--- cmd/lotus-storage-miner/init.go | 4 +-- go.mod | 2 ++ node/impl/storminer.go | 39 +++++++++++++++++++++--------- node/modules/storageminer.go | 2 +- storage/sealing/garbage.go | 4 +-- storage/sealing/sealing.go | 2 +- storage/sectorblocks/blockstore.go | 1 + 12 files changed, 45 insertions(+), 28 deletions(-) diff --git a/cmd/lotus-bench/main.go b/cmd/lotus-bench/main.go index 86939459a..b93600508 100644 --- a/cmd/lotus-bench/main.go +++ b/cmd/lotus-bench/main.go @@ -146,7 +146,7 @@ func main() { Miner: maddr, SectorSize: sectorSize, WorkerThreads: 2, - Dir: sbdir, + Paths: sectorbuilder.SimplePath(sbdir), } if robench == "" { @@ -174,7 +174,7 @@ func main() { r := rand.New(rand.NewSource(100 + int64(i))) - pi, err := sb.AddPiece(dataSize, i, r, nil) + pi, err := sb.AddPiece(context.TODO(), dataSize, i, r, nil) if err != nil { return err } @@ -225,7 +225,7 @@ func main() { if !c.Bool("skip-unseal") { log.Info("Unsealing sector") - rc, err := sb.ReadPieceFromSealedSector(1, 0, dataSize, ticket.TicketBytes[:], commD[:]) + rc, err := sb.ReadPieceFromSealedSector(context.TODO(), 1, 0, dataSize, ticket.TicketBytes[:], commD[:]) if err != nil { return err } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index bf91c97d8..c40a4f8bc 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -41,7 +41,6 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage) { go subMpool(ctx, api, st) go subBlocks(ctx, api, st) } - } } }() diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index f79c691c9..1ff57c859 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -35,7 +35,7 @@ func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, aut SectorSize: ssize, Miner: act, WorkerThreads: 1, - Dir: repo, + Paths: sectorbuilder.SimplePath(repo), }) if err != nil { return err diff --git a/cmd/lotus-seed/main.go b/cmd/lotus-seed/main.go index f3f529af0..b30f523c9 100644 --- a/cmd/lotus-seed/main.go +++ b/cmd/lotus-seed/main.go @@ -196,7 +196,7 @@ var aggregateSectorDirsCmd = &cli.Command{ agsb, err := sectorbuilder.New(§orbuilder.Config{ Miner: maddr, SectorSize: ssize, - Dir: destdir, + Paths: sectorbuilder.SimplePath(destdir), WorkerThreads: 2, }, namespace.Wrap(agmds, datastore.NewKey("/sectorbuilder"))) if err != nil { @@ -257,7 +257,7 @@ var aggregateSectorDirsCmd = &cli.Command{ sb, err := sectorbuilder.New(§orbuilder.Config{ Miner: maddr, SectorSize: genm.SectorSize, - Dir: dir, + Paths: sectorbuilder.SimplePath(dir), WorkerThreads: 2, }, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) if err != nil { diff --git a/cmd/lotus-seed/seed/seed.go b/cmd/lotus-seed/seed/seed.go index d4c00e794..57fbd9e19 100644 --- a/cmd/lotus-seed/seed/seed.go +++ b/cmd/lotus-seed/seed/seed.go @@ -32,7 +32,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb Miner: maddr, SectorSize: ssize, FallbackLastID: offset, - Dir: sbroot, + Paths: sectorbuilder.SimplePath(sbroot), WorkerThreads: 2, } @@ -59,7 +59,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb return nil, err } - pi, err := sb.AddPiece(size, sid, rand.Reader, nil) + pi, err := sb.AddPiece(context.TODO(), size, sid, rand.Reader, nil) if err != nil { return nil, err } @@ -76,7 +76,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb return nil, xerrors.Errorf("commit: %w", err) } - if err := sb.TrimCache(sid); err != nil { + if err := sb.TrimCache(context.TODO(), sid); err != nil { return nil, xerrors.Errorf("trim cache: %w", err) } diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 18b31c9b6..42abae157 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -177,7 +177,7 @@ var initCmd = &cli.Command{ oldsb, err := sectorbuilder.New(§orbuilder.Config{ SectorSize: ssize, WorkerThreads: 2, - Dir: pssb, + Paths: sectorbuilder.SimplePath(pssb), }, namespace.Wrap(oldmds, datastore.NewKey("/sectorbuilder"))) if err != nil { return xerrors.Errorf("failed to open up preseal sectorbuilder: %w", err) @@ -186,7 +186,7 @@ var initCmd = &cli.Command{ nsb, err := sectorbuilder.New(§orbuilder.Config{ SectorSize: ssize, WorkerThreads: 2, - Dir: lr.Path(), + Paths: sectorbuilder.SimplePath(lr.Path()), }, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) if err != nil { return xerrors.Errorf("failed to open up sectorbuilder: %w", err) diff --git a/go.mod b/go.mod index d261af44c..46149dafd 100644 --- a/go.mod +++ b/go.mod @@ -113,3 +113,5 @@ replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0 + +replace github.com/filecoin-project/go-sectorbuilder => /home/magik6k/gohack/github.com/filecoin-project/go-sectorbuilder diff --git a/node/impl/storminer.go b/node/impl/storminer.go index b600dfb7f..961d27a70 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -7,15 +7,16 @@ import ( "mime" "net/http" "os" - - "github.com/filecoin-project/lotus/api/apistruct" + "strconv" "github.com/gorilla/mux" files "github.com/ipfs/go-ipfs-files" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-sectorbuilder/fs" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/lib/tarutil" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/storage" @@ -43,8 +44,8 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { mux := mux.NewRouter() - mux.HandleFunc("/remote/{type}/{sname}", sm.remoteGetSector).Methods("GET") - mux.HandleFunc("/remote/{type}/{sname}", sm.remotePutSector).Methods("PUT") + mux.HandleFunc("/remote/{type}/{id}", sm.remoteGetSector).Methods("GET") + mux.HandleFunc("/remote/{type}/{id}", sm.remotePutSector).Methods("PUT") log.Infof("SERVEGETREMOTE %s", r.URL) @@ -54,14 +55,21 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"]) + id, err := strconv.ParseUint(vars["id"], 10, 64) + if err != nil { + log.Error("parsing sector id: ", err) + w.WriteHeader(500) + return + } + + path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), id) if err != nil { log.Error(err) w.WriteHeader(500) return } - stat, err := os.Stat(path) + stat, err := os.Stat(string(path)) if err != nil { log.Error(err) w.WriteHeader(500) @@ -70,10 +78,10 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques var rd io.Reader if stat.IsDir() { - rd, err = tarutil.TarDirectory(path) + rd, err = tarutil.TarDirectory(string(path)) w.Header().Set("Content-Type", "application/x-tar") } else { - rd, err = os.OpenFile(path, os.O_RDONLY, 0644) + rd, err = os.OpenFile(string(path), os.O_RDONLY, 0644) w.Header().Set("Content-Type", "application/octet-stream") } if err != nil { @@ -92,7 +100,14 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"]) + id, err := strconv.ParseUint(vars["id"], 10, 64) + if err != nil { + log.Error("parsing sector id: ", err) + w.WriteHeader(500) + return + } + + path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), id) if err != nil { log.Error(err) w.WriteHeader(500) @@ -106,7 +121,7 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques return } - if err := os.RemoveAll(path); err != nil { + if err := os.RemoveAll(string(path)); err != nil { log.Error(err) w.WriteHeader(500) return @@ -114,13 +129,13 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques switch mediatype { case "application/x-tar": - if err := tarutil.ExtractTar(r.Body, path); err != nil { + if err := tarutil.ExtractTar(r.Body, string(path)); err != nil { log.Error(err) w.WriteHeader(500) return } default: - if err := files.WriteTo(files.NewReaderFile(r.Body), path); err != nil { + if err := files.WriteTo(files.NewReaderFile(r.Body), string(path)); err != nil { log.Error(err) w.WriteHeader(500) return diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 533946696..e2c1b44c0 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -91,7 +91,7 @@ func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit NoPreCommit: noprecommit, NoCommit: nocommit, - Dir: sp, + Paths: sectorbuilder.SimplePath(sp), } return sb, nil diff --git a/storage/sealing/garbage.go b/storage/sealing/garbage.go index 1c3925671..32dff004f 100644 --- a/storage/sealing/garbage.go +++ b/storage/sealing/garbage.go @@ -81,9 +81,9 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie out := make([]Piece, len(sizes)) for i, size := range sizes { - ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes) + ppi, err := m.sb.AddPiece(ctx, size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes) if err != nil { - return nil, err + return nil, xerrors.Errorf("add piece: %w", err) } existingPieceSizes = append(existingPieceSizes, size) diff --git a/storage/sealing/sealing.go b/storage/sealing/sealing.go index 6d0c6bb46..e6e45c0eb 100644 --- a/storage/sealing/sealing.go +++ b/storage/sealing/sealing.go @@ -112,7 +112,7 @@ func (m *Sealing) AllocatePiece(size uint64) (sectorID uint64, offset uint64, er func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error { log.Infof("Seal piece for deal %d", dealID) - ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{}) + ppi, err := m.sb.AddPiece(ctx, size, sectorID, r, []uint64{}) if err != nil { return xerrors.Errorf("adding piece to sector: %w", err) } diff --git a/storage/sectorblocks/blockstore.go b/storage/sectorblocks/blockstore.go index 36394c1c8..63539e2f7 100644 --- a/storage/sectorblocks/blockstore.go +++ b/storage/sectorblocks/blockstore.go @@ -96,6 +96,7 @@ func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) { log.Infof("reading block %s from sector %d(+%d;%d)", c, best.SectorID, best.Offset, best.Size) r, err := s.sectorBlocks.sb.ReadPieceFromSealedSector( + context.TODO(), best.SectorID, best.Offset, best.Size, From 3f58ffe572fe9204ad683c900d624d517ed9a387 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 29 Jan 2020 19:10:41 +0100 Subject: [PATCH 02/17] storageminer: New storage config --- cmd/lotus-storage-miner/init.go | 2 +- node/builder.go | 16 ++++++++++++---- node/config/def.go | 5 ++++- node/modules/storageminer.go | 13 ++++++++----- 4 files changed, 25 insertions(+), 11 deletions(-) diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 42abae157..232893f9d 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -369,7 +369,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, return err } - sbcfg, err := modules.SectorBuilderConfig(lr.Path(), 2, false, false)(mds, api) + sbcfg, err := modules.SectorBuilderConfig(sectorbuilder.SimplePath(lr.Path()), 2, false, false)(mds, api) if err != nil { return xerrors.Errorf("getting genesis miner sector builder config: %w", err) } diff --git a/node/builder.go b/node/builder.go index 5ec6e4183..028778c09 100644 --- a/node/builder.go +++ b/node/builder.go @@ -7,6 +7,7 @@ import ( sectorbuilder "github.com/filecoin-project/go-sectorbuilder" blockstore "github.com/ipfs/go-ipfs-blockstore" + logging "github.com/ipfs/go-log" ci "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" @@ -54,6 +55,8 @@ import ( "github.com/filecoin-project/lotus/storage/sectorblocks" ) +var log = logging.Logger("builder") + // special is a type used to give keys to modules which // can't really be identified by the returned type type special struct{ id int } @@ -342,15 +345,20 @@ func ConfigStorageMiner(c interface{}, lr repo.LockedRepo) Option { return Error(xerrors.Errorf("invalid config from repo, got: %T", c)) } - path := cfg.SectorBuilder.Path - if path == "" { - path = lr.Path() + scfg := sectorbuilder.SimplePath(lr.Path()) + if cfg.SectorBuilder.Path == "" { + if len(cfg.SectorBuilder.Storage) > 0 { + scfg = cfg.SectorBuilder.Storage + } + } else { + scfg = sectorbuilder.SimplePath(cfg.SectorBuilder.Path) + log.Warn("LEGACY SectorBuilder.Path FOUND IN CONFIG. Please use the new storage config") } return Options( ConfigCommon(&cfg.Common), - Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(path, + Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(scfg, cfg.SectorBuilder.WorkerCount, cfg.SectorBuilder.DisableLocalPreCommit, cfg.SectorBuilder.DisableLocalCommit)), diff --git a/node/config/def.go b/node/config/def.go index 97ed036d5..43007b684 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -3,6 +3,8 @@ package config import ( "encoding" "time" + + "github.com/filecoin-project/go-sectorbuilder/fs" ) // Common is common config between full node and miner @@ -54,7 +56,8 @@ type Metrics struct { // // Storage Miner type SectorBuilder struct { - Path string + Path string // TODO: remove // FORK (-ish) + Storage []fs.PathConfig WorkerCount uint DisableLocalPreCommit bool diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index e2c1b44c0..8981b10c3 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -14,6 +14,7 @@ import ( storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" paramfetch "github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-sectorbuilder/fs" "github.com/filecoin-project/go-statestore" "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" @@ -62,7 +63,7 @@ func GetParams(sbc *sectorbuilder.Config) error { return nil } -func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit bool) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) { +func SectorBuilderConfig(storage []fs.PathConfig, threads uint, noprecommit, nocommit bool) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) { return func(ds dtypes.MetadataDS, api api.FullNode) (*sectorbuilder.Config, error) { minerAddr, err := minerAddrFromDS(ds) if err != nil { @@ -74,9 +75,11 @@ func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit return nil, err } - sp, err := homedir.Expand(storagePath) - if err != nil { - return nil, err + for i := range storage { + storage[i].Path, err = homedir.Expand(storage[i].Path) + if err != nil { + return nil, err + } } if threads > math.MaxUint8 { @@ -91,7 +94,7 @@ func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit NoPreCommit: noprecommit, NoCommit: nocommit, - Paths: sectorbuilder.SimplePath(sp), + Paths: storage, } return sb, nil From 4b8b79dbe0ed29d3b8669f7a89ea002335810f38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 29 Jan 2020 22:25:06 +0100 Subject: [PATCH 03/17] sealing: FinalizeSector step --- api/api_storage.go | 2 +- storage/sealing/fsm.go | 8 +++++++- storage/sealing/fsm_events.go | 8 ++++++++ storage/sealing/states.go | 8 ++++++++ 4 files changed, 24 insertions(+), 2 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index 77cf32a84..7f6894c75 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -22,11 +22,11 @@ const ( WaitSeed // waiting for seed Committing CommitWait // waiting for message to land on chain + FinalizeSector Proving _ // reserved _ _ - _ // recovery handling // Reseal diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index ad0803488..4e48be15e 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -48,10 +48,14 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{ ), api.Committing: planCommitting, api.CommitWait: planOne( - on(SectorProving{}, api.Proving), + on(SectorProving{}, api.FinalizeSector), on(SectorCommitFailed{}, api.CommitFailed), ), + api.FinalizeSector: planOne( + on(SectorFinalized{}, api.Proving), + ), + api.Proving: planOne( on(SectorFaultReported{}, api.FaultReported), on(SectorFaulty{}, api.Faulty), @@ -150,6 +154,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta return m.handleCommitting, nil case api.CommitWait: return m.handleCommitWait, nil + case api.FinalizeSector: + case api.Proving: // TODO: track sector health / expiration log.Infof("Proving sector %d", state.SectorID) diff --git a/storage/sealing/fsm_events.go b/storage/sealing/fsm_events.go index ee4963750..84b1120c8 100644 --- a/storage/sealing/fsm_events.go +++ b/storage/sealing/fsm_events.go @@ -120,6 +120,14 @@ type SectorProving struct{} func (evt SectorProving) apply(*SectorInfo) {} +type SectorFinalized struct{} + +func (evt SectorFinalized) apply(*SectorInfo) {} + +type SectorFinalizeFailed struct{ error } + +func (evt SectorFinalizeFailed) apply(*SectorInfo) {} + // Failed state recovery type SectorRetrySeal struct{} diff --git a/storage/sealing/states.go b/storage/sealing/states.go index 519245e03..22a7f642b 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -232,6 +232,14 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorProving{}) } +func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error { + if err := m.sb.FinalizeSector(ctx.Context(), sector.SectorID); err != nil { + return ctx.Send(SectorCommitFailed{err}) + } + + return ctx.Send(SectorFinalized{}) +} + func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error { // TODO: check if the fault has already been reported, and that this sector is even valid From e2b2026fa5f130b705329e5de317d8fc0af88b7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 29 Jan 2020 23:37:31 +0100 Subject: [PATCH 04/17] actually call finalizeSector --- storage/sealing/fsm.go | 2 +- storage/sealing/states.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index 4e48be15e..2355c68bd 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -155,7 +155,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta case api.CommitWait: return m.handleCommitWait, nil case api.FinalizeSector: - + return m.handleFinalizeSector, nil case api.Proving: // TODO: track sector health / expiration log.Infof("Proving sector %d", state.SectorID) diff --git a/storage/sealing/states.go b/storage/sealing/states.go index 22a7f642b..5d70533bc 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -233,8 +233,14 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) } func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error { + // TODO: Maybe wait for some finality + if err := m.sb.FinalizeSector(ctx.Context(), sector.SectorID); err != nil { - return ctx.Send(SectorCommitFailed{err}) + return ctx.Send(SectorCommitFailed{xerrors.Errorf("finalize sector: %w", err)}) + } + + if err := m.sb.DropStaged(ctx.Context(), sector.SectorID); err != nil { + return ctx.Send(SectorCommitFailed{xerrors.Errorf("drop staged: %w", err)}) } return ctx.Send(SectorFinalized{}) From 03f07042ff7d05213a07b973ff7abf38250bb210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 29 Jan 2020 23:47:28 +0100 Subject: [PATCH 05/17] fpost: print message apply result --- api/api_storage.go | 1 + storage/fpost_run.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/api/api_storage.go b/api/api_storage.go index 7f6894c75..5c4362929 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -64,6 +64,7 @@ var SectorStates = []string{ WaitSeed: "WaitSeed", Committing: "Committing", CommitWait: "CommitWait", + FinalizeSector: "FinalizeSector", Proving: "Proving", SealFailed: "SealFailed", diff --git a/storage/fpost_run.go b/storage/fpost_run.go index 1e23bb175..6dd3a8fe1 100644 --- a/storage/fpost_run.go +++ b/storage/fpost_run.go @@ -211,5 +211,19 @@ func (s *fpostScheduler) submitPost(ctx context.Context, proof *actors.SubmitFal log.Infof("Submitted fallback post: %s", sm.Cid()) + go func() { + rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid()) + if err != nil { + log.Error(err) + return + } + + if rec.Receipt.ExitCode == 0 { + return + } + + log.Errorf("Submitting fallback post %s failed: exit %d", sm.Cid(), rec.Receipt.ExitCode) + }() + return nil } From 4aaa7585437d86cbe1533d086aa169aa5d23169c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 30 Jan 2020 01:50:58 +0100 Subject: [PATCH 06/17] fpost: better fault checks --- api/api_full.go | 1 + api/apistruct/struct.go | 5 +++ chain/stmgr/utils.go | 17 ++++++++ node/impl/full/state.go | 4 ++ storage/fpost_run.go | 97 +++++++++++++++++++++++++++-------------- storage/miner.go | 1 + 6 files changed, 93 insertions(+), 32 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index 80edf385b..f22c322e4 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -108,6 +108,7 @@ type FullNode interface { StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error) + StateMinerFaults(context.Context, address.Address, *types.TipSet) ([]uint64, error) StatePledgeCollateral(context.Context, *types.TipSet) (types.BigInt, error) StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error) StateListMiners(context.Context, *types.TipSet) ([]address.Address, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 1a6e09421..b747b34a4 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -99,6 +99,7 @@ type FullNodeStruct struct { StateMinerPeerID func(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) `perm:"read"` StateMinerElectionPeriodStart func(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) `perm:"read"` StateMinerSectorSize func(context.Context, address.Address, *types.TipSet) (uint64, error) `perm:"read"` + StateMinerFaults func(context.Context, address.Address, *types.TipSet) ([]uint64, error) `perm:"read"` StateCall func(context.Context, *types.Message, *types.TipSet) (*api.MethodCall, error) `perm:"read"` StateReplay func(context.Context, *types.TipSet, cid.Cid) (*api.ReplayResults, error) `perm:"read"` StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"` @@ -410,6 +411,10 @@ func (c *FullNodeStruct) StateMinerSectorSize(ctx context.Context, actor address return c.Internal.StateMinerSectorSize(ctx, actor, ts) } +func (c *FullNodeStruct) StateMinerFaults(ctx context.Context, actor address.Address, ts *types.TipSet) ([]uint64, error) { + return c.Internal.StateMinerFaults(ctx, actor, ts) +} + func (c *FullNodeStruct) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.MethodCall, error) { return c.Internal.StateCall(ctx, msg, ts) } diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index 1fa84b4d2..5480dc949 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -2,6 +2,8 @@ package stmgr import ( "context" + amt2 "github.com/filecoin-project/go-amt-ipld/v2" + "github.com/filecoin-project/lotus/chain/actors/aerrors" ffi "github.com/filecoin-project/filecoin-ffi" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" @@ -253,6 +255,21 @@ func GetMinerSlashed(ctx context.Context, sm *StateManager, ts *types.TipSet, ma return mas.SlashedAt, nil } +func GetMinerFaults(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) ([]uint64, error) { + var mas actors.StorageMinerActorState + _, err := sm.LoadActorState(ctx, maddr, &mas, ts) + if err != nil { + return nil, xerrors.Errorf("(get ssize) failed to load miner actor state: %w", err) + } + + ss, lerr := amt2.LoadAMT(amt.WrapBlockstore(sm.cs.Blockstore()), mas.Sectors) + if lerr != nil { + return nil, aerrors.HandleExternalError(lerr, "could not load proving set node") + } + + return mas.FaultSet.All(2 * ss.Count) +} + func GetStorageDeal(ctx context.Context, sm *StateManager, dealId uint64, ts *types.TipSet) (*actors.OnChainDeal, error) { var state actors.StorageMarketState if _, err := sm.LoadActorState(ctx, actors.StorageMarketAddress, &state, ts); err != nil { diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 3ba0dfbf8..d8af4abbf 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -84,6 +84,10 @@ func (a *StateAPI) StateMinerSectorSize(ctx context.Context, actor address.Addre return stmgr.GetMinerSectorSize(ctx, a.StateManager, ts, actor) } +func (a *StateAPI) StateMinerFaults(ctx context.Context, addr address.Address, ts *types.TipSet) ([]uint64, error) { + return stmgr.GetMinerFaults(ctx, a.StateManager, ts, addr) +} + func (a *StateAPI) StatePledgeCollateral(ctx context.Context, ts *types.TipSet) (types.BigInt, error) { param, err := actors.SerializeParams(&actors.PledgeCollateralParams{Size: types.NewInt(0)}) if err != nil { diff --git a/storage/fpost_run.go b/storage/fpost_run.go index 6dd3a8fe1..3c09b76b0 100644 --- a/storage/fpost_run.go +++ b/storage/fpost_run.go @@ -50,52 +50,85 @@ func (s *fpostScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSe }() } +func (s *fpostScheduler) declareFaults(ctx context.Context, fc uint64, params *actors.DeclareFaultsParams) error { + log.Warnf("DECLARING %d FAULTS", fc) + + enc, aerr := actors.SerializeParams(params) + if aerr != nil { + return xerrors.Errorf("could not serialize declare faults parameters: %w", aerr) + } + + msg := &types.Message{ + To: s.actor, + From: s.worker, + Method: actors.MAMethods.DeclareFaults, + Params: enc, + Value: types.NewInt(0), + GasLimit: types.NewInt(10000000), // i dont know help + GasPrice: types.NewInt(1), + } + + sm, err := s.api.MpoolPushMessage(ctx, msg) + if err != nil { + return xerrors.Errorf("pushing faults message to mpool: %w", err) + } + + rec, err := s.api.StateWaitMsg(ctx, sm.Cid()) + if err != nil { + return xerrors.Errorf("waiting for declare faults: %w", err) + } + + if rec.Receipt.ExitCode != 0 { + return xerrors.Errorf("declare faults exit %d", rec.Receipt.ExitCode) + } + + log.Infof("Faults declared successfully") + return nil +} + func (s *fpostScheduler) checkFaults(ctx context.Context, ssi sectorbuilder.SortedPublicSectorInfo) ([]uint64, error) { faults := s.sb.Scrub(ssi) - var faultIDs []uint64 + + declaredFaults := map[uint64]struct{}{} + + { + chainFaults, err := s.api.StateMinerFaults(ctx, s.actor, nil) + if err != nil { + return nil, xerrors.Errorf("checking on-chain faults: %w", err) + } + + for _, fault := range chainFaults { + declaredFaults[fault] = struct{}{} + } + } if len(faults) > 0 { params := &actors.DeclareFaultsParams{Faults: types.NewBitField()} for _, fault := range faults { - log.Warnf("fault detected: sector %d: %s", fault.SectorID, fault.Err) - faultIDs = append(faultIDs, fault.SectorID) + if _, ok := declaredFaults[fault.SectorID]; ok { + continue + } - // TODO: omit already declared (with finality in mind though..) + log.Warnf("new fault detected: sector %d: %s", fault.SectorID, fault.Err) + declaredFaults[fault.SectorID] = struct{}{} params.Faults.Set(fault.SectorID) } - log.Warnf("DECLARING %d FAULTS", len(faults)) - - enc, aerr := actors.SerializeParams(params) - if aerr != nil { - return nil, xerrors.Errorf("could not serialize declare faults parameters: %w", aerr) - } - - msg := &types.Message{ - To: s.actor, - From: s.worker, - Method: actors.MAMethods.DeclareFaults, - Params: enc, - Value: types.NewInt(0), - GasLimit: types.NewInt(10000000), // i dont know help - GasPrice: types.NewInt(1), - } - - sm, err := s.api.MpoolPushMessage(ctx, msg) + pc, err := params.Faults.Count() if err != nil { - return nil, xerrors.Errorf("pushing faults message to mpool: %w", err) + return nil, xerrors.Errorf("counting faults: %w", err) } + if pc > 0 { + if err := s.declareFaults(ctx, pc, params); err != nil { + return nil, err + } + } + } - rec, err := s.api.StateWaitMsg(ctx, sm.Cid()) - if err != nil { - return nil, xerrors.Errorf("waiting for declare faults: %w", err) - } - - if rec.Receipt.ExitCode != 0 { - return nil, xerrors.Errorf("declare faults exit %d", rec.Receipt.ExitCode) - } - log.Infof("Faults declared successfully") + faultIDs := make([]uint64, 0, len(declaredFaults)) + for fault := range declaredFaults { + faultIDs = append(faultIDs, fault) } return faultIDs, nil diff --git a/storage/miner.go b/storage/miner.go index 06eb2c47c..4772dcff6 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -50,6 +50,7 @@ type storageMinerApi interface { StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) + StateMinerFaults(context.Context, address.Address, *types.TipSet) ([]uint64, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) From cec208579027ebfd17a87919f4dd719944dbad3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 30 Jan 2020 07:41:30 +0100 Subject: [PATCH 07/17] sealing: Fix pledgeReader --- storage/sealing/garbage.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/storage/sealing/garbage.go b/storage/sealing/garbage.go index 2173759d2..22a282229 100644 --- a/storage/sealing/garbage.go +++ b/storage/sealing/garbage.go @@ -5,6 +5,7 @@ import ( "context" "io" "math" + "math/bits" "math/rand" "runtime" @@ -16,6 +17,8 @@ import ( ) func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader { + parts = 1 << bits.Len64(parts) // round down to nearest power of 2 + piece := sectorbuilder.UserBytesForSectorSize((size/127 + size) / parts) readers := make([]io.Reader, parts) From 316b11445eb3e7cee29f2442ff035f0e426f49a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 30 Jan 2020 19:38:03 +0100 Subject: [PATCH 08/17] storageminer: Print fault data in info --- cmd/lotus-storage-miner/info.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index a7c8b0d58..6ad4b08ba 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -57,8 +57,20 @@ var infoCmd = &cli.Command{ if err != nil { return err } + faults, err := api.StateMinerFaults(ctx, maddr, nil) + if err != nil { + return err + } + fmt.Printf("\tCommitted: %s\n", types.BigMul(types.NewInt(secCounts.Sset), types.NewInt(sizeByte)).SizeStr()) - fmt.Printf("\tProving: %s\n", types.BigMul(types.NewInt(secCounts.Pset), types.NewInt(sizeByte)).SizeStr()) + if len(faults) == 0 { + fmt.Printf("\tProving: %s\n", types.BigMul(types.NewInt(secCounts.Pset), types.NewInt(sizeByte)).SizeStr()) + } else { + fmt.Printf("\tProving: %s (%s Faulty, %.2f%%)\n", + types.BigMul(types.NewInt(secCounts.Pset - uint64(len(faults))), types.NewInt(sizeByte)).SizeStr(), + types.BigMul(types.NewInt(uint64(len(faults))), types.NewInt(sizeByte)).SizeStr(), + float64(10000 * uint64(len(faults)) / secCounts.Pset) / 100.) + } // TODO: indicate whether the post worker is in use wstat, err := nodeApi.WorkerStats(ctx) From ee9060aa14bd87474370cc96d9bbce9966954b7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 31 Jan 2020 02:18:48 +0100 Subject: [PATCH 09/17] Update sectorbuilder --- go.mod | 4 +--- go.sum | 2 ++ storage/sbmock/sbmock.go | 27 ++++++++++++++++++++++++--- storage/sealing/utils.go | 3 +++ storage/sealing/utils_test.go | 7 +++++++ 5 files changed, 37 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 46149dafd..141a3b176 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8 github.com/filecoin-project/go-paramfetch v0.0.1 - github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55 + github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200131010043-6b57024f839c github.com/filecoin-project/go-statestore v0.1.0 github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/go-ole/go-ole v1.2.4 // indirect @@ -113,5 +113,3 @@ replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0 - -replace github.com/filecoin-project/go-sectorbuilder => /home/magik6k/gohack/github.com/filecoin-project/go-sectorbuilder diff --git a/go.sum b/go.sum index 3b9b9a133..50d3bd6e9 100644 --- a/go.sum +++ b/go.sum @@ -119,6 +119,8 @@ github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifo github.com/filecoin-project/go-sectorbuilder v0.0.1/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc= github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55 h1:XChPRKPZL+/N6a3ccLmjCJ7JrR+SFLFJDllv0BkxW4I= github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55/go.mod h1:ahsryULdwYoZ94K09HcfqX3QBwevWVldENSV/EdCbNg= +github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200131010043-6b57024f839c h1:qv1tEab/IklFknEM8VK2WgxxM7aZ5/uwm5xFgvHTp4A= +github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200131010043-6b57024f839c/go.mod h1:jNGVCDihkMFnraYVLH1xl4ceZQVxx/u4dOORrTKeRi0= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= diff --git a/storage/sbmock/sbmock.go b/storage/sbmock/sbmock.go index f28df2395..7f28e4290 100644 --- a/storage/sbmock/sbmock.go +++ b/storage/sbmock/sbmock.go @@ -13,6 +13,7 @@ import ( ffi "github.com/filecoin-project/filecoin-ffi" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-sectorbuilder/fs" "golang.org/x/xerrors" ) @@ -60,7 +61,7 @@ func (sb *SBMock) RateLimit() func() { } } -func (sb *SBMock) AddPiece(size uint64, sectorId uint64, r io.Reader, existingPieces []uint64) (sectorbuilder.PublicPieceInfo, error) { +func (sb *SBMock) AddPiece(ctx context.Context, size uint64, sectorId uint64, r io.Reader, existingPieces []uint64) (sectorbuilder.PublicPieceInfo, error) { sb.lk.Lock() ss, ok := sb.sectors[sectorId] if !ok { @@ -284,7 +285,7 @@ func (sb *SBMock) GenerateEPostCandidates(sectorInfo sectorbuilder.SortedPublicS return out, nil } -func (sb *SBMock) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) { +func (sb *SBMock) ReadPieceFromSealedSector(ctx context.Context, sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) { if len(sb.sectors[sectorID].pieces) > 1 { panic("implme") } @@ -301,7 +302,7 @@ func (sb *SBMock) StageFakeData() (uint64, []sectorbuilder.PublicPieceInfo, erro buf := make([]byte, usize) rand.Read(buf) - pi, err := sb.AddPiece(usize, sid, bytes.NewReader(buf), nil) + pi, err := sb.AddPiece(context.TODO(), usize, sid, bytes.NewReader(buf), nil) if err != nil { return 0, nil, err } @@ -309,6 +310,26 @@ func (sb *SBMock) StageFakeData() (uint64, []sectorbuilder.PublicPieceInfo, erro return sid, []sectorbuilder.PublicPieceInfo{pi}, nil } +func (sb *SBMock) FinalizeSector(context.Context, uint64) error { + panic("implement me") +} + +func (sb *SBMock) DropStaged(context.Context, uint64) error { + panic("implement me") +} + +func (sb *SBMock) SectorPath(typ fs.DataType, sectorID uint64) (fs.SectorPath, error) { + panic("implement me") +} + +func (sb *SBMock) AllocSectorPath(typ fs.DataType, sectorID uint64, cache bool) (fs.SectorPath, error) { + panic("implement me") +} + +func (sb *SBMock) ReleaseSector(fs.DataType, fs.SectorPath) { + panic("implement me") +} + func (m mockVerif) VerifyElectionPost(ctx context.Context, sectorSize uint64, sectorInfo sectorbuilder.SortedPublicSectorInfo, challengeSeed []byte, proof []byte, candidates []sectorbuilder.EPostCandidate, proverID address.Address) (bool, error) { panic("implement me") } diff --git a/storage/sealing/utils.go b/storage/sealing/utils.go index 21d6b76bf..f317301ba 100644 --- a/storage/sealing/utils.go +++ b/storage/sealing/utils.go @@ -49,6 +49,9 @@ func fillersFromRem(toFill uint64) ([]uint64, error) { func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { parts = 1 << bits.Len64(parts) // round down to nearest power of 2 + if size / parts < 127 { + parts = size / 127 + } piece := sectorbuilder.UserBytesForSectorSize((size + size / 127) / parts) out := make([]sectorbuilder.PublicPieceInfo, parts) diff --git a/storage/sealing/utils_test.go b/storage/sealing/utils_test.go index 9f9ca3880..94bf858c1 100644 --- a/storage/sealing/utils_test.go +++ b/storage/sealing/utils_test.go @@ -52,4 +52,11 @@ func TestFastPledge(t *testing.T) { if _, err := s.fastPledgeCommitment(sectorbuilder.UserBytesForSectorSize(sz), 5); err != nil { t.Fatalf("%+v", err) } + + sz = uint64(1024) + + s = Sealing{sb: sbmock.NewMockSectorBuilder(0, sz)} + if _, err := s.fastPledgeCommitment(sectorbuilder.UserBytesForSectorSize(sz), 64); err != nil { + t.Fatalf("%+v", err) + } } From 95f344540ec6ce0f6c2be4394f444e49c7d1f291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 31 Jan 2020 02:27:38 +0100 Subject: [PATCH 10/17] Fix tests --- cmd/lotus-storage-miner/info.go | 4 ++-- node/node_test.go | 2 +- storage/sbmock/sbmock.go | 4 ++-- storage/sealing/fsm_test.go | 6 ++++++ storage/sealing/utils.go | 4 ++-- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index 6ad4b08ba..e1b4f4b32 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -67,9 +67,9 @@ var infoCmd = &cli.Command{ fmt.Printf("\tProving: %s\n", types.BigMul(types.NewInt(secCounts.Pset), types.NewInt(sizeByte)).SizeStr()) } else { fmt.Printf("\tProving: %s (%s Faulty, %.2f%%)\n", - types.BigMul(types.NewInt(secCounts.Pset - uint64(len(faults))), types.NewInt(sizeByte)).SizeStr(), + types.BigMul(types.NewInt(secCounts.Pset-uint64(len(faults))), types.NewInt(sizeByte)).SizeStr(), types.BigMul(types.NewInt(uint64(len(faults))), types.NewInt(sizeByte)).SizeStr(), - float64(10000 * uint64(len(faults)) / secCounts.Pset) / 100.) + float64(10000*uint64(len(faults))/secCounts.Pset)/100.) } // TODO: indicate whether the post worker is in use diff --git a/node/node_test.go b/node/node_test.go index b9c1f778e..0ac3a17ab 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -232,7 +232,7 @@ func builder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test.Te SectorSize: 1024, WorkerThreads: 2, Miner: genMiner, - Dir: psd, + Paths: sectorbuilder.SimplePath(psd), }, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) if err != nil { t.Fatal(err) diff --git a/storage/sbmock/sbmock.go b/storage/sbmock/sbmock.go index 7f28e4290..4d88853f5 100644 --- a/storage/sbmock/sbmock.go +++ b/storage/sbmock/sbmock.go @@ -311,11 +311,11 @@ func (sb *SBMock) StageFakeData() (uint64, []sectorbuilder.PublicPieceInfo, erro } func (sb *SBMock) FinalizeSector(context.Context, uint64) error { - panic("implement me") + return nil } func (sb *SBMock) DropStaged(context.Context, uint64) error { - panic("implement me") + return nil } func (sb *SBMock) SectorPath(typ fs.DataType, sectorID uint64) (fs.SectorPath, error) { diff --git a/storage/sealing/fsm_test.go b/storage/sealing/fsm_test.go index 7430bb634..24145a2a1 100644 --- a/storage/sealing/fsm_test.go +++ b/storage/sealing/fsm_test.go @@ -50,6 +50,9 @@ func TestHappyPath(t *testing.T) { require.Equal(m.t, m.state.State, api.CommitWait) m.planSingle(SectorProving{}) + require.Equal(m.t, m.state.State, api.FinalizeSector) + + m.planSingle(SectorFinalized{}) require.Equal(m.t, m.state.State, api.Proving) } @@ -81,6 +84,9 @@ func TestSeedRevert(t *testing.T) { require.Equal(m.t, m.state.State, api.CommitWait) m.planSingle(SectorProving{}) + require.Equal(m.t, m.state.State, api.FinalizeSector) + + m.planSingle(SectorFinalized{}) require.Equal(m.t, m.state.State, api.Proving) } diff --git a/storage/sealing/utils.go b/storage/sealing/utils.go index f317301ba..b2221b278 100644 --- a/storage/sealing/utils.go +++ b/storage/sealing/utils.go @@ -49,11 +49,11 @@ func fillersFromRem(toFill uint64) ([]uint64, error) { func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { parts = 1 << bits.Len64(parts) // round down to nearest power of 2 - if size / parts < 127 { + if size/parts < 127 { parts = size / 127 } - piece := sectorbuilder.UserBytesForSectorSize((size + size / 127) / parts) + piece := sectorbuilder.UserBytesForSectorSize((size + size/127) / parts) out := make([]sectorbuilder.PublicPieceInfo, parts) var lk sync.Mutex From 1b6ad6d2a125bbd9f2fc45d134d6abb7532ff98a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 31 Jan 2020 19:56:48 +0100 Subject: [PATCH 11/17] worker: Fix transfer urls --- cmd/lotus-seal-worker/transfer.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/cmd/lotus-seal-worker/transfer.go b/cmd/lotus-seal-worker/transfer.go index fcd473392..e9e8e760f 100644 --- a/cmd/lotus-seal-worker/transfer.go +++ b/cmd/lotus-seal-worker/transfer.go @@ -1,12 +1,14 @@ package main import ( + "fmt" "io" "mime" "net/http" "os" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-sectorbuilder/fs" files "github.com/ipfs/go-ipfs-files" "golang.org/x/xerrors" "gopkg.in/cheggaaa/pb.v1" @@ -26,7 +28,7 @@ func (w *worker) sizeForType(typ string) int64 { func (w *worker) fetch(typ string, sectorID uint64) error { outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) - url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) + url := w.minerEndpoint + "/remote/" + typ + "/" + fmt.Sprint(sectorID) log.Infof("Fetch %s %s", typ, url) req, err := http.NewRequest("GET", url, nil) @@ -76,21 +78,24 @@ func (w *worker) fetch(typ string, sectorID uint64) error { } func (w *worker) push(typ string, sectorID uint64) error { - filename := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) + filename, err := w.sb.SectorPath(fs.DataType(typ), sectorID) + if err != nil { + return err + } - url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) + url := w.minerEndpoint + "/remote/" + typ + "/" + fmt.Sprint(sectorID) log.Infof("Push %s %s", typ, url) - stat, err := os.Stat(filename) + stat, err := os.Stat(string(filename)) if err != nil { return err } var r io.Reader if stat.IsDir() { - r, err = tarutil.TarDirectory(filename) + r, err = tarutil.TarDirectory(string(filename)) } else { - r, err = os.OpenFile(filename, os.O_RDONLY, 0644) + r, err = os.OpenFile(string(filename), os.O_RDONLY, 0644) } if err != nil { return xerrors.Errorf("opening push reader: %w", err) From 87f7315f5a315cf69f6f2f97ef6264214e34d6a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 31 Jan 2020 19:57:28 +0100 Subject: [PATCH 12/17] mod tidy --- go.sum | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.sum b/go.sum index 50d3bd6e9..2982c0d6e 100644 --- a/go.sum +++ b/go.sum @@ -117,8 +117,6 @@ github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go. github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE= github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= github.com/filecoin-project/go-sectorbuilder v0.0.1/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc= -github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55 h1:XChPRKPZL+/N6a3ccLmjCJ7JrR+SFLFJDllv0BkxW4I= -github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55/go.mod h1:ahsryULdwYoZ94K09HcfqX3QBwevWVldENSV/EdCbNg= github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200131010043-6b57024f839c h1:qv1tEab/IklFknEM8VK2WgxxM7aZ5/uwm5xFgvHTp4A= github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200131010043-6b57024f839c/go.mod h1:jNGVCDihkMFnraYVLH1xl4ceZQVxx/u4dOORrTKeRi0= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= From b6ba0ed47c51376b4dc0de9ab8163eb934f23f13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 31 Jan 2020 20:07:20 +0100 Subject: [PATCH 13/17] fix pledgeReader with small sectors --- storage/sealing/garbage.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/storage/sealing/garbage.go b/storage/sealing/garbage.go index 22a282229..ac49ac668 100644 --- a/storage/sealing/garbage.go +++ b/storage/sealing/garbage.go @@ -18,6 +18,9 @@ import ( func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader { parts = 1 << bits.Len64(parts) // round down to nearest power of 2 + if size/parts < 127 { + parts = size / 127 + } piece := sectorbuilder.UserBytesForSectorSize((size/127 + size) / parts) From 37929ff75fd53e6f00e24027300d2e920afb09c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 31 Jan 2020 20:22:31 +0100 Subject: [PATCH 14/17] sealing: more logging in pledge sector flow --- storage/sealing/garbage.go | 9 +++++++-- storage/sealing/sealing.go | 1 + 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/storage/sealing/garbage.go b/storage/sealing/garbage.go index ac49ac668..3274f6aec 100644 --- a/storage/sealing/garbage.go +++ b/storage/sealing/garbage.go @@ -37,6 +37,8 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie return nil, nil } + log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes) + deals := make([]actors.StorageDealProposal, len(sizes)) for i, size := range sizes { commP, err := m.fastPledgeCommitment(size, uint64(runtime.NumCPU())) @@ -59,6 +61,8 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie deals[i] = sdp } + log.Infof("Publishing deals for %d", sectorID) + params, aerr := actors.SerializeParams(&actors.PublishStorageDealsParams{ Deals: deals, }) @@ -78,7 +82,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie if err != nil { return nil, err } - r, err := m.api.StateWaitMsg(ctx, smsg.Cid()) + r, err := m.api.StateWaitMsg(ctx, smsg.Cid()) // TODO: more finality if err != nil { return nil, err } @@ -93,8 +97,9 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals") } - out := make([]Piece, len(sizes)) + log.Infof("Deals for sector %d: %+v", sectorID, resp.DealIDs) + out := make([]Piece, len(sizes)) for i, size := range sizes { ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size, uint64(runtime.NumCPU())), existingPieceSizes) if err != nil { diff --git a/storage/sealing/sealing.go b/storage/sealing/sealing.go index e6e45c0eb..6e562001d 100644 --- a/storage/sealing/sealing.go +++ b/storage/sealing/sealing.go @@ -121,6 +121,7 @@ func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, secto } func (m *Sealing) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error { + log.Infof("Start sealing %d", sid) return m.sectors.Send(sid, SectorStart{ id: sid, pieces: []Piece{ From e453a388655baade3c8c1ed95c0bfaee40e06810 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 2 Feb 2020 20:36:15 +0100 Subject: [PATCH 15/17] sealing: fix finalize with cache only --- storage/sealing/states.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/storage/sealing/states.go b/storage/sealing/states.go index 5d70533bc..723a2d843 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -4,6 +4,7 @@ import ( "context" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-sectorbuilder/fs" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/build" @@ -236,11 +237,14 @@ func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorIn // TODO: Maybe wait for some finality if err := m.sb.FinalizeSector(ctx.Context(), sector.SectorID); err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("finalize sector: %w", err)}) + if !xerrors.Is(err, fs.ErrNoSuitablePath) { + return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) + } + log.Warnf("finalize sector: %v", err) } if err := m.sb.DropStaged(ctx.Context(), sector.SectorID); err != nil { - return ctx.Send(SectorCommitFailed{xerrors.Errorf("drop staged: %w", err)}) + return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("drop staged: %w", err)}) } return ctx.Send(SectorFinalized{}) From 8da64d710e2d9c86d2f6133381aa18805b3d3e8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 3 Feb 2020 04:48:56 +0100 Subject: [PATCH 16/17] sealing: Work around broken remote sector put --- node/impl/storminer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 961d27a70..7a60119c5 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -107,7 +107,8 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques return } - path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), id) + // This is going to get better with worker-to-worker transfers + path, err := sm.SectorBuilder.AllocSectorPath(fs.DataType(vars["type"]), id, true) if err != nil { log.Error(err) w.WriteHeader(500) From ff77198a08a5fe444deec20364e2b29b23f91ea0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 3 Feb 2020 18:37:40 +0100 Subject: [PATCH 17/17] Update sectorbuilder with later cache mkdir --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 141a3b176..50d7d8fb7 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8 github.com/filecoin-project/go-paramfetch v0.0.1 - github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200131010043-6b57024f839c + github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200203173614-42d67726bb62 github.com/filecoin-project/go-statestore v0.1.0 github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/go-ole/go-ole v1.2.4 // indirect diff --git a/go.sum b/go.sum index 2982c0d6e..cd6e097f5 100644 --- a/go.sum +++ b/go.sum @@ -119,6 +119,8 @@ github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifo github.com/filecoin-project/go-sectorbuilder v0.0.1/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc= github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200131010043-6b57024f839c h1:qv1tEab/IklFknEM8VK2WgxxM7aZ5/uwm5xFgvHTp4A= github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200131010043-6b57024f839c/go.mod h1:jNGVCDihkMFnraYVLH1xl4ceZQVxx/u4dOORrTKeRi0= +github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200203173614-42d67726bb62 h1:/+xdjMkIdiRs6vA2lJU56iqtEcl9BQgYXi8b2KuuYCg= +github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200203173614-42d67726bb62/go.mod h1:jNGVCDihkMFnraYVLH1xl4ceZQVxx/u4dOORrTKeRi0= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=