diff --git a/api/api_full.go b/api/api_full.go index 64d0dc622..3ebb6b0a7 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -109,6 +109,7 @@ type FullNode interface { StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error) + StateMinerFaults(context.Context, address.Address, *types.TipSet) ([]uint64, error) StatePledgeCollateral(context.Context, *types.TipSet) (types.BigInt, error) StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error) StateListMiners(context.Context, *types.TipSet) ([]address.Address, error) diff --git a/api/api_storage.go b/api/api_storage.go index 77cf32a84..5c4362929 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -22,11 +22,11 @@ const ( WaitSeed // waiting for seed Committing CommitWait // waiting for message to land on chain + FinalizeSector Proving _ // reserved _ _ - _ // recovery handling // Reseal @@ -64,6 +64,7 @@ var SectorStates = []string{ WaitSeed: "WaitSeed", Committing: "Committing", CommitWait: "CommitWait", + FinalizeSector: "FinalizeSector", Proving: "Proving", SealFailed: "SealFailed", diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 16eaabfa8..11808013d 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -100,6 +100,7 @@ type FullNodeStruct struct { StateMinerPeerID func(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) `perm:"read"` StateMinerElectionPeriodStart func(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) `perm:"read"` StateMinerSectorSize func(context.Context, address.Address, *types.TipSet) (uint64, error) `perm:"read"` + StateMinerFaults func(context.Context, address.Address, *types.TipSet) ([]uint64, error) `perm:"read"` StateCall func(context.Context, *types.Message, *types.TipSet) (*api.MethodCall, error) `perm:"read"` StateReplay func(context.Context, *types.TipSet, cid.Cid) (*api.ReplayResults, error) `perm:"read"` StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"` @@ -417,6 +418,10 @@ func (c *FullNodeStruct) StateMinerSectorSize(ctx context.Context, actor address return c.Internal.StateMinerSectorSize(ctx, actor, ts) } +func (c *FullNodeStruct) StateMinerFaults(ctx context.Context, actor address.Address, ts *types.TipSet) ([]uint64, error) { + return c.Internal.StateMinerFaults(ctx, actor, ts) +} + func (c *FullNodeStruct) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.MethodCall, error) { return c.Internal.StateCall(ctx, msg, ts) } diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index 1fa84b4d2..5480dc949 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -2,6 +2,8 @@ package stmgr import ( "context" + amt2 "github.com/filecoin-project/go-amt-ipld/v2" + "github.com/filecoin-project/lotus/chain/actors/aerrors" ffi "github.com/filecoin-project/filecoin-ffi" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" @@ -253,6 +255,21 @@ func GetMinerSlashed(ctx context.Context, sm *StateManager, ts *types.TipSet, ma return mas.SlashedAt, nil } +func GetMinerFaults(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address) ([]uint64, error) { + var mas actors.StorageMinerActorState + _, err := sm.LoadActorState(ctx, maddr, &mas, ts) + if err != nil { + return nil, xerrors.Errorf("(get ssize) failed to load miner actor state: %w", err) + } + + ss, lerr := amt2.LoadAMT(amt.WrapBlockstore(sm.cs.Blockstore()), mas.Sectors) + if lerr != nil { + return nil, aerrors.HandleExternalError(lerr, "could not load proving set node") + } + + return mas.FaultSet.All(2 * ss.Count) +} + func GetStorageDeal(ctx context.Context, sm *StateManager, dealId uint64, ts *types.TipSet) (*actors.OnChainDeal, error) { var state actors.StorageMarketState if _, err := sm.LoadActorState(ctx, actors.StorageMarketAddress, &state, ts); err != nil { diff --git a/cmd/lotus-bench/main.go b/cmd/lotus-bench/main.go index cb25bb23a..cd222c800 100644 --- a/cmd/lotus-bench/main.go +++ b/cmd/lotus-bench/main.go @@ -146,7 +146,7 @@ func main() { Miner: maddr, SectorSize: sectorSize, WorkerThreads: 2, - Dir: sbdir, + Paths: sectorbuilder.SimplePath(sbdir), } if robench == "" { @@ -174,7 +174,7 @@ func main() { r := rand.New(rand.NewSource(100 + int64(i))) - pi, err := sb.AddPiece(dataSize, i, r, nil) + pi, err := sb.AddPiece(context.TODO(), dataSize, i, r, nil) if err != nil { return err } @@ -225,7 +225,7 @@ func main() { if !c.Bool("skip-unseal") { log.Info("Unsealing sector") - rc, err := sb.ReadPieceFromSealedSector(1, 0, dataSize, ticket.TicketBytes[:], commD[:]) + rc, err := sb.ReadPieceFromSealedSector(context.TODO(), 1, 0, dataSize, ticket.TicketBytes[:], commD[:]) if err != nil { return err } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index bf91c97d8..c40a4f8bc 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -41,7 +41,6 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage) { go subMpool(ctx, api, st) go subBlocks(ctx, api, st) } - } } }() diff --git a/cmd/lotus-seal-worker/sub.go b/cmd/lotus-seal-worker/sub.go index bf45c685b..ec5d88870 100644 --- a/cmd/lotus-seal-worker/sub.go +++ b/cmd/lotus-seal-worker/sub.go @@ -35,7 +35,7 @@ func acceptJobs(ctx context.Context, api lapi.StorageMiner, endpoint string, aut SectorSize: ssize, Miner: act, WorkerThreads: 1, - Dir: repo, + Paths: sectorbuilder.SimplePath(repo), }) if err != nil { return err diff --git a/cmd/lotus-seal-worker/transfer.go b/cmd/lotus-seal-worker/transfer.go index fcd473392..e9e8e760f 100644 --- a/cmd/lotus-seal-worker/transfer.go +++ b/cmd/lotus-seal-worker/transfer.go @@ -1,12 +1,14 @@ package main import ( + "fmt" "io" "mime" "net/http" "os" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-sectorbuilder/fs" files "github.com/ipfs/go-ipfs-files" "golang.org/x/xerrors" "gopkg.in/cheggaaa/pb.v1" @@ -26,7 +28,7 @@ func (w *worker) sizeForType(typ string) int64 { func (w *worker) fetch(typ string, sectorID uint64) error { outname := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) - url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) + url := w.minerEndpoint + "/remote/" + typ + "/" + fmt.Sprint(sectorID) log.Infof("Fetch %s %s", typ, url) req, err := http.NewRequest("GET", url, nil) @@ -76,21 +78,24 @@ func (w *worker) fetch(typ string, sectorID uint64) error { } func (w *worker) push(typ string, sectorID uint64) error { - filename := filepath.Join(w.repo, typ, w.sb.SectorName(sectorID)) + filename, err := w.sb.SectorPath(fs.DataType(typ), sectorID) + if err != nil { + return err + } - url := w.minerEndpoint + "/remote/" + typ + "/" + w.sb.SectorName(sectorID) + url := w.minerEndpoint + "/remote/" + typ + "/" + fmt.Sprint(sectorID) log.Infof("Push %s %s", typ, url) - stat, err := os.Stat(filename) + stat, err := os.Stat(string(filename)) if err != nil { return err } var r io.Reader if stat.IsDir() { - r, err = tarutil.TarDirectory(filename) + r, err = tarutil.TarDirectory(string(filename)) } else { - r, err = os.OpenFile(filename, os.O_RDONLY, 0644) + r, err = os.OpenFile(string(filename), os.O_RDONLY, 0644) } if err != nil { return xerrors.Errorf("opening push reader: %w", err) diff --git a/cmd/lotus-seed/main.go b/cmd/lotus-seed/main.go index f3f529af0..b30f523c9 100644 --- a/cmd/lotus-seed/main.go +++ b/cmd/lotus-seed/main.go @@ -196,7 +196,7 @@ var aggregateSectorDirsCmd = &cli.Command{ agsb, err := sectorbuilder.New(§orbuilder.Config{ Miner: maddr, SectorSize: ssize, - Dir: destdir, + Paths: sectorbuilder.SimplePath(destdir), WorkerThreads: 2, }, namespace.Wrap(agmds, datastore.NewKey("/sectorbuilder"))) if err != nil { @@ -257,7 +257,7 @@ var aggregateSectorDirsCmd = &cli.Command{ sb, err := sectorbuilder.New(§orbuilder.Config{ Miner: maddr, SectorSize: genm.SectorSize, - Dir: dir, + Paths: sectorbuilder.SimplePath(dir), WorkerThreads: 2, }, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) if err != nil { diff --git a/cmd/lotus-seed/seed/seed.go b/cmd/lotus-seed/seed/seed.go index d4c00e794..57fbd9e19 100644 --- a/cmd/lotus-seed/seed/seed.go +++ b/cmd/lotus-seed/seed/seed.go @@ -32,7 +32,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb Miner: maddr, SectorSize: ssize, FallbackLastID: offset, - Dir: sbroot, + Paths: sectorbuilder.SimplePath(sbroot), WorkerThreads: 2, } @@ -59,7 +59,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb return nil, err } - pi, err := sb.AddPiece(size, sid, rand.Reader, nil) + pi, err := sb.AddPiece(context.TODO(), size, sid, rand.Reader, nil) if err != nil { return nil, err } @@ -76,7 +76,7 @@ func PreSeal(maddr address.Address, ssize uint64, offset uint64, sectors int, sb return nil, xerrors.Errorf("commit: %w", err) } - if err := sb.TrimCache(sid); err != nil { + if err := sb.TrimCache(context.TODO(), sid); err != nil { return nil, xerrors.Errorf("trim cache: %w", err) } diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index e2d83b1ee..aeda0cf89 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -57,8 +57,20 @@ var infoCmd = &cli.Command{ if err != nil { return err } + faults, err := api.StateMinerFaults(ctx, maddr, nil) + if err != nil { + return err + } + fmt.Printf("\tCommitted: %s\n", types.BigMul(types.NewInt(secCounts.Sset), types.NewInt(sizeByte)).SizeStr()) - fmt.Printf("\tProving: %s\n", types.BigMul(types.NewInt(secCounts.Pset), types.NewInt(sizeByte)).SizeStr()) + if len(faults) == 0 { + fmt.Printf("\tProving: %s\n", types.BigMul(types.NewInt(secCounts.Pset), types.NewInt(sizeByte)).SizeStr()) + } else { + fmt.Printf("\tProving: %s (%s Faulty, %.2f%%)\n", + types.BigMul(types.NewInt(secCounts.Pset-uint64(len(faults))), types.NewInt(sizeByte)).SizeStr(), + types.BigMul(types.NewInt(uint64(len(faults))), types.NewInt(sizeByte)).SizeStr(), + float64(10000*uint64(len(faults))/secCounts.Pset)/100.) + } // TODO: indicate whether the post worker is in use wstat, err := nodeApi.WorkerStats(ctx) diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index b40b2e04e..5c888aa2e 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -177,7 +177,7 @@ var initCmd = &cli.Command{ oldsb, err := sectorbuilder.New(§orbuilder.Config{ SectorSize: ssize, WorkerThreads: 2, - Dir: pssb, + Paths: sectorbuilder.SimplePath(pssb), }, namespace.Wrap(oldmds, datastore.NewKey("/sectorbuilder"))) if err != nil { return xerrors.Errorf("failed to open up preseal sectorbuilder: %w", err) @@ -186,7 +186,7 @@ var initCmd = &cli.Command{ nsb, err := sectorbuilder.New(§orbuilder.Config{ SectorSize: ssize, WorkerThreads: 2, - Dir: lr.Path(), + Paths: sectorbuilder.SimplePath(lr.Path()), }, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) if err != nil { return xerrors.Errorf("failed to open up sectorbuilder: %w", err) @@ -369,7 +369,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode, return err } - sbcfg, err := modules.SectorBuilderConfig(lr.Path(), 2, false, false)(mds, api) + sbcfg, err := modules.SectorBuilderConfig(sectorbuilder.SimplePath(lr.Path()), 2, false, false)(mds, api) if err != nil { return xerrors.Errorf("getting genesis miner sector builder config: %w", err) } diff --git a/go.mod b/go.mod index d261af44c..50d7d8fb7 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8 github.com/filecoin-project/go-paramfetch v0.0.1 - github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55 + github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200203173614-42d67726bb62 github.com/filecoin-project/go-statestore v0.1.0 github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/go-ole/go-ole v1.2.4 // indirect diff --git a/go.sum b/go.sum index 3b9b9a133..29beb8ca5 100644 --- a/go.sum +++ b/go.sum @@ -117,8 +117,8 @@ github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go. github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE= github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= github.com/filecoin-project/go-sectorbuilder v0.0.1/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc= -github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55 h1:XChPRKPZL+/N6a3ccLmjCJ7JrR+SFLFJDllv0BkxW4I= -github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200123143044-d9cc96c53c55/go.mod h1:ahsryULdwYoZ94K09HcfqX3QBwevWVldENSV/EdCbNg= +github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200203173614-42d67726bb62 h1:/+xdjMkIdiRs6vA2lJU56iqtEcl9BQgYXi8b2KuuYCg= +github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200203173614-42d67726bb62/go.mod h1:jNGVCDihkMFnraYVLH1xl4ceZQVxx/u4dOORrTKeRi0= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= diff --git a/node/builder.go b/node/builder.go index 5ec6e4183..028778c09 100644 --- a/node/builder.go +++ b/node/builder.go @@ -7,6 +7,7 @@ import ( sectorbuilder "github.com/filecoin-project/go-sectorbuilder" blockstore "github.com/ipfs/go-ipfs-blockstore" + logging "github.com/ipfs/go-log" ci "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" @@ -54,6 +55,8 @@ import ( "github.com/filecoin-project/lotus/storage/sectorblocks" ) +var log = logging.Logger("builder") + // special is a type used to give keys to modules which // can't really be identified by the returned type type special struct{ id int } @@ -342,15 +345,20 @@ func ConfigStorageMiner(c interface{}, lr repo.LockedRepo) Option { return Error(xerrors.Errorf("invalid config from repo, got: %T", c)) } - path := cfg.SectorBuilder.Path - if path == "" { - path = lr.Path() + scfg := sectorbuilder.SimplePath(lr.Path()) + if cfg.SectorBuilder.Path == "" { + if len(cfg.SectorBuilder.Storage) > 0 { + scfg = cfg.SectorBuilder.Storage + } + } else { + scfg = sectorbuilder.SimplePath(cfg.SectorBuilder.Path) + log.Warn("LEGACY SectorBuilder.Path FOUND IN CONFIG. Please use the new storage config") } return Options( ConfigCommon(&cfg.Common), - Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(path, + Override(new(*sectorbuilder.Config), modules.SectorBuilderConfig(scfg, cfg.SectorBuilder.WorkerCount, cfg.SectorBuilder.DisableLocalPreCommit, cfg.SectorBuilder.DisableLocalCommit)), diff --git a/node/config/def.go b/node/config/def.go index 97ed036d5..43007b684 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -3,6 +3,8 @@ package config import ( "encoding" "time" + + "github.com/filecoin-project/go-sectorbuilder/fs" ) // Common is common config between full node and miner @@ -54,7 +56,8 @@ type Metrics struct { // // Storage Miner type SectorBuilder struct { - Path string + Path string // TODO: remove // FORK (-ish) + Storage []fs.PathConfig WorkerCount uint DisableLocalPreCommit bool diff --git a/node/impl/full/state.go b/node/impl/full/state.go index c505c6efc..02944e67f 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -85,6 +85,10 @@ func (a *StateAPI) StateMinerSectorSize(ctx context.Context, actor address.Addre return stmgr.GetMinerSectorSize(ctx, a.StateManager, ts, actor) } +func (a *StateAPI) StateMinerFaults(ctx context.Context, addr address.Address, ts *types.TipSet) ([]uint64, error) { + return stmgr.GetMinerFaults(ctx, a.StateManager, ts, addr) +} + func (a *StateAPI) StatePledgeCollateral(ctx context.Context, ts *types.TipSet) (types.BigInt, error) { param, err := actors.SerializeParams(&actors.PledgeCollateralParams{Size: types.NewInt(0)}) if err != nil { diff --git a/node/impl/storminer.go b/node/impl/storminer.go index b600dfb7f..7a60119c5 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -7,15 +7,16 @@ import ( "mime" "net/http" "os" - - "github.com/filecoin-project/lotus/api/apistruct" + "strconv" "github.com/gorilla/mux" files "github.com/ipfs/go-ipfs-files" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-sectorbuilder/fs" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/lib/tarutil" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/storage" @@ -43,8 +44,8 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { mux := mux.NewRouter() - mux.HandleFunc("/remote/{type}/{sname}", sm.remoteGetSector).Methods("GET") - mux.HandleFunc("/remote/{type}/{sname}", sm.remotePutSector).Methods("PUT") + mux.HandleFunc("/remote/{type}/{id}", sm.remoteGetSector).Methods("GET") + mux.HandleFunc("/remote/{type}/{id}", sm.remotePutSector).Methods("PUT") log.Infof("SERVEGETREMOTE %s", r.URL) @@ -54,14 +55,21 @@ func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"]) + id, err := strconv.ParseUint(vars["id"], 10, 64) + if err != nil { + log.Error("parsing sector id: ", err) + w.WriteHeader(500) + return + } + + path, err := sm.SectorBuilder.SectorPath(fs.DataType(vars["type"]), id) if err != nil { log.Error(err) w.WriteHeader(500) return } - stat, err := os.Stat(path) + stat, err := os.Stat(string(path)) if err != nil { log.Error(err) w.WriteHeader(500) @@ -70,10 +78,10 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques var rd io.Reader if stat.IsDir() { - rd, err = tarutil.TarDirectory(path) + rd, err = tarutil.TarDirectory(string(path)) w.Header().Set("Content-Type", "application/x-tar") } else { - rd, err = os.OpenFile(path, os.O_RDONLY, 0644) + rd, err = os.OpenFile(string(path), os.O_RDONLY, 0644) w.Header().Set("Content-Type", "application/octet-stream") } if err != nil { @@ -92,7 +100,15 @@ func (sm *StorageMinerAPI) remoteGetSector(w http.ResponseWriter, r *http.Reques func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - path, err := sm.SectorBuilder.GetPath(vars["type"], vars["sname"]) + id, err := strconv.ParseUint(vars["id"], 10, 64) + if err != nil { + log.Error("parsing sector id: ", err) + w.WriteHeader(500) + return + } + + // This is going to get better with worker-to-worker transfers + path, err := sm.SectorBuilder.AllocSectorPath(fs.DataType(vars["type"]), id, true) if err != nil { log.Error(err) w.WriteHeader(500) @@ -106,7 +122,7 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques return } - if err := os.RemoveAll(path); err != nil { + if err := os.RemoveAll(string(path)); err != nil { log.Error(err) w.WriteHeader(500) return @@ -114,13 +130,13 @@ func (sm *StorageMinerAPI) remotePutSector(w http.ResponseWriter, r *http.Reques switch mediatype { case "application/x-tar": - if err := tarutil.ExtractTar(r.Body, path); err != nil { + if err := tarutil.ExtractTar(r.Body, string(path)); err != nil { log.Error(err) w.WriteHeader(500) return } default: - if err := files.WriteTo(files.NewReaderFile(r.Body), path); err != nil { + if err := files.WriteTo(files.NewReaderFile(r.Body), string(path)); err != nil { log.Error(err) w.WriteHeader(500) return diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index dfeec2398..e42734297 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -14,6 +14,7 @@ import ( storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" paramfetch "github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-sectorbuilder/fs" "github.com/filecoin-project/go-statestore" "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" @@ -62,7 +63,7 @@ func GetParams(sbc *sectorbuilder.Config) error { return nil } -func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit bool) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) { +func SectorBuilderConfig(storage []fs.PathConfig, threads uint, noprecommit, nocommit bool) func(dtypes.MetadataDS, api.FullNode) (*sectorbuilder.Config, error) { return func(ds dtypes.MetadataDS, api api.FullNode) (*sectorbuilder.Config, error) { minerAddr, err := minerAddrFromDS(ds) if err != nil { @@ -74,9 +75,11 @@ func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit return nil, err } - sp, err := homedir.Expand(storagePath) - if err != nil { - return nil, err + for i := range storage { + storage[i].Path, err = homedir.Expand(storage[i].Path) + if err != nil { + return nil, err + } } if threads > math.MaxUint8 { @@ -91,7 +94,7 @@ func SectorBuilderConfig(storagePath string, threads uint, noprecommit, nocommit NoPreCommit: noprecommit, NoCommit: nocommit, - Dir: sp, + Paths: storage, } return sb, nil diff --git a/node/node_test.go b/node/node_test.go index b9c1f778e..0ac3a17ab 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -232,7 +232,7 @@ func builder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test.Te SectorSize: 1024, WorkerThreads: 2, Miner: genMiner, - Dir: psd, + Paths: sectorbuilder.SimplePath(psd), }, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) if err != nil { t.Fatal(err) diff --git a/storage/fpost_run.go b/storage/fpost_run.go index ca5321513..5cd711c7b 100644 --- a/storage/fpost_run.go +++ b/storage/fpost_run.go @@ -50,52 +50,85 @@ func (s *FPoStScheduler) doPost(ctx context.Context, eps uint64, ts *types.TipSe }() } +func (s *FPoStScheduler) declareFaults(ctx context.Context, fc uint64, params *actors.DeclareFaultsParams) error { + log.Warnf("DECLARING %d FAULTS", fc) + + enc, aerr := actors.SerializeParams(params) + if aerr != nil { + return xerrors.Errorf("could not serialize declare faults parameters: %w", aerr) + } + + msg := &types.Message{ + To: s.actor, + From: s.worker, + Method: actors.MAMethods.DeclareFaults, + Params: enc, + Value: types.NewInt(0), + GasLimit: types.NewInt(10000000), // i dont know help + GasPrice: types.NewInt(1), + } + + sm, err := s.api.MpoolPushMessage(ctx, msg) + if err != nil { + return xerrors.Errorf("pushing faults message to mpool: %w", err) + } + + rec, err := s.api.StateWaitMsg(ctx, sm.Cid()) + if err != nil { + return xerrors.Errorf("waiting for declare faults: %w", err) + } + + if rec.Receipt.ExitCode != 0 { + return xerrors.Errorf("declare faults exit %d", rec.Receipt.ExitCode) + } + + log.Infof("Faults declared successfully") + return nil +} + func (s *FPoStScheduler) checkFaults(ctx context.Context, ssi sectorbuilder.SortedPublicSectorInfo) ([]uint64, error) { faults := s.sb.Scrub(ssi) - var faultIDs []uint64 + + declaredFaults := map[uint64]struct{}{} + + { + chainFaults, err := s.api.StateMinerFaults(ctx, s.actor, nil) + if err != nil { + return nil, xerrors.Errorf("checking on-chain faults: %w", err) + } + + for _, fault := range chainFaults { + declaredFaults[fault] = struct{}{} + } + } if len(faults) > 0 { params := &actors.DeclareFaultsParams{Faults: types.NewBitField()} for _, fault := range faults { - log.Warnf("fault detected: sector %d: %s", fault.SectorID, fault.Err) - faultIDs = append(faultIDs, fault.SectorID) + if _, ok := declaredFaults[fault.SectorID]; ok { + continue + } - // TODO: omit already declared (with finality in mind though..) + log.Warnf("new fault detected: sector %d: %s", fault.SectorID, fault.Err) + declaredFaults[fault.SectorID] = struct{}{} params.Faults.Set(fault.SectorID) } - log.Warnf("DECLARING %d FAULTS", len(faults)) - - enc, aerr := actors.SerializeParams(params) - if aerr != nil { - return nil, xerrors.Errorf("could not serialize declare faults parameters: %w", aerr) - } - - msg := &types.Message{ - To: s.actor, - From: s.worker, - Method: actors.MAMethods.DeclareFaults, - Params: enc, - Value: types.NewInt(0), - GasLimit: types.NewInt(10000000), // i dont know help - GasPrice: types.NewInt(1), - } - - sm, err := s.api.MpoolPushMessage(ctx, msg) + pc, err := params.Faults.Count() if err != nil { - return nil, xerrors.Errorf("pushing faults message to mpool: %w", err) + return nil, xerrors.Errorf("counting faults: %w", err) } + if pc > 0 { + if err := s.declareFaults(ctx, pc, params); err != nil { + return nil, err + } + } + } - rec, err := s.api.StateWaitMsg(ctx, sm.Cid()) - if err != nil { - return nil, xerrors.Errorf("waiting for declare faults: %w", err) - } - - if rec.Receipt.ExitCode != 0 { - return nil, xerrors.Errorf("declare faults exit %d", rec.Receipt.ExitCode) - } - log.Infof("Faults declared successfully") + faultIDs := make([]uint64, 0, len(declaredFaults)) + for fault := range declaredFaults { + faultIDs = append(faultIDs, fault) } return faultIDs, nil @@ -211,5 +244,19 @@ func (s *FPoStScheduler) submitPost(ctx context.Context, proof *actors.SubmitFal log.Infof("Submitted fallback post: %s", sm.Cid()) + go func() { + rec, err := s.api.StateWaitMsg(context.TODO(), sm.Cid()) + if err != nil { + log.Error(err) + return + } + + if rec.Receipt.ExitCode == 0 { + return + } + + log.Errorf("Submitting fallback post %s failed: exit %d", sm.Cid(), rec.Receipt.ExitCode) + }() + return nil } diff --git a/storage/miner.go b/storage/miner.go index 9fc26d1b5..110d93565 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -50,6 +50,7 @@ type storageMinerApi interface { StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) + StateMinerFaults(context.Context, address.Address, *types.TipSet) ([]uint64, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) diff --git a/storage/sbmock/sbmock.go b/storage/sbmock/sbmock.go index f28df2395..4d88853f5 100644 --- a/storage/sbmock/sbmock.go +++ b/storage/sbmock/sbmock.go @@ -13,6 +13,7 @@ import ( ffi "github.com/filecoin-project/filecoin-ffi" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-sectorbuilder/fs" "golang.org/x/xerrors" ) @@ -60,7 +61,7 @@ func (sb *SBMock) RateLimit() func() { } } -func (sb *SBMock) AddPiece(size uint64, sectorId uint64, r io.Reader, existingPieces []uint64) (sectorbuilder.PublicPieceInfo, error) { +func (sb *SBMock) AddPiece(ctx context.Context, size uint64, sectorId uint64, r io.Reader, existingPieces []uint64) (sectorbuilder.PublicPieceInfo, error) { sb.lk.Lock() ss, ok := sb.sectors[sectorId] if !ok { @@ -284,7 +285,7 @@ func (sb *SBMock) GenerateEPostCandidates(sectorInfo sectorbuilder.SortedPublicS return out, nil } -func (sb *SBMock) ReadPieceFromSealedSector(sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) { +func (sb *SBMock) ReadPieceFromSealedSector(ctx context.Context, sectorID uint64, offset uint64, size uint64, ticket []byte, commD []byte) (io.ReadCloser, error) { if len(sb.sectors[sectorID].pieces) > 1 { panic("implme") } @@ -301,7 +302,7 @@ func (sb *SBMock) StageFakeData() (uint64, []sectorbuilder.PublicPieceInfo, erro buf := make([]byte, usize) rand.Read(buf) - pi, err := sb.AddPiece(usize, sid, bytes.NewReader(buf), nil) + pi, err := sb.AddPiece(context.TODO(), usize, sid, bytes.NewReader(buf), nil) if err != nil { return 0, nil, err } @@ -309,6 +310,26 @@ func (sb *SBMock) StageFakeData() (uint64, []sectorbuilder.PublicPieceInfo, erro return sid, []sectorbuilder.PublicPieceInfo{pi}, nil } +func (sb *SBMock) FinalizeSector(context.Context, uint64) error { + return nil +} + +func (sb *SBMock) DropStaged(context.Context, uint64) error { + return nil +} + +func (sb *SBMock) SectorPath(typ fs.DataType, sectorID uint64) (fs.SectorPath, error) { + panic("implement me") +} + +func (sb *SBMock) AllocSectorPath(typ fs.DataType, sectorID uint64, cache bool) (fs.SectorPath, error) { + panic("implement me") +} + +func (sb *SBMock) ReleaseSector(fs.DataType, fs.SectorPath) { + panic("implement me") +} + func (m mockVerif) VerifyElectionPost(ctx context.Context, sectorSize uint64, sectorInfo sectorbuilder.SortedPublicSectorInfo, challengeSeed []byte, proof []byte, candidates []sectorbuilder.EPostCandidate, proverID address.Address) (bool, error) { panic("implement me") } diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index 4fdd81d35..6ed5c0cfc 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -48,10 +48,14 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{ ), api.Committing: planCommitting, api.CommitWait: planOne( - on(SectorProving{}, api.Proving), + on(SectorProving{}, api.FinalizeSector), on(SectorCommitFailed{}, api.CommitFailed), ), + api.FinalizeSector: planOne( + on(SectorFinalized{}, api.Proving), + ), + api.Proving: planOne( on(SectorFaultReported{}, api.FaultReported), on(SectorFaulty{}, api.Faulty), @@ -150,6 +154,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta return m.handleCommitting, nil case api.CommitWait: return m.handleCommitWait, nil + case api.FinalizeSector: + return m.handleFinalizeSector, nil case api.Proving: // TODO: track sector health / expiration log.Infof("Proving sector %d", state.SectorID) diff --git a/storage/sealing/fsm_events.go b/storage/sealing/fsm_events.go index ee4963750..84b1120c8 100644 --- a/storage/sealing/fsm_events.go +++ b/storage/sealing/fsm_events.go @@ -120,6 +120,14 @@ type SectorProving struct{} func (evt SectorProving) apply(*SectorInfo) {} +type SectorFinalized struct{} + +func (evt SectorFinalized) apply(*SectorInfo) {} + +type SectorFinalizeFailed struct{ error } + +func (evt SectorFinalizeFailed) apply(*SectorInfo) {} + // Failed state recovery type SectorRetrySeal struct{} diff --git a/storage/sealing/fsm_test.go b/storage/sealing/fsm_test.go index 7430bb634..24145a2a1 100644 --- a/storage/sealing/fsm_test.go +++ b/storage/sealing/fsm_test.go @@ -50,6 +50,9 @@ func TestHappyPath(t *testing.T) { require.Equal(m.t, m.state.State, api.CommitWait) m.planSingle(SectorProving{}) + require.Equal(m.t, m.state.State, api.FinalizeSector) + + m.planSingle(SectorFinalized{}) require.Equal(m.t, m.state.State, api.Proving) } @@ -81,6 +84,9 @@ func TestSeedRevert(t *testing.T) { require.Equal(m.t, m.state.State, api.CommitWait) m.planSingle(SectorProving{}) + require.Equal(m.t, m.state.State, api.FinalizeSector) + + m.planSingle(SectorFinalized{}) require.Equal(m.t, m.state.State, api.Proving) } diff --git a/storage/sealing/garbage.go b/storage/sealing/garbage.go index 4a3b1331b..3274f6aec 100644 --- a/storage/sealing/garbage.go +++ b/storage/sealing/garbage.go @@ -5,6 +5,7 @@ import ( "context" "io" "math" + "math/bits" "math/rand" "runtime" @@ -16,6 +17,11 @@ import ( ) func (m *Sealing) pledgeReader(size uint64, parts uint64) io.Reader { + parts = 1 << bits.Len64(parts) // round down to nearest power of 2 + if size/parts < 127 { + parts = size / 127 + } + piece := sectorbuilder.UserBytesForSectorSize((size/127 + size) / parts) readers := make([]io.Reader, parts) @@ -31,6 +37,8 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie return nil, nil } + log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes) + deals := make([]actors.StorageDealProposal, len(sizes)) for i, size := range sizes { commP, err := m.fastPledgeCommitment(size, uint64(runtime.NumCPU())) @@ -53,6 +61,8 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie deals[i] = sdp } + log.Infof("Publishing deals for %d", sectorID) + params, aerr := actors.SerializeParams(&actors.PublishStorageDealsParams{ Deals: deals, }) @@ -72,7 +82,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie if err != nil { return nil, err } - r, err := m.api.StateWaitMsg(ctx, smsg.Cid()) + r, err := m.api.StateWaitMsg(ctx, smsg.Cid()) // TODO: more finality if err != nil { return nil, err } @@ -87,12 +97,13 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID uint64, existingPie return nil, xerrors.New("got unexpected number of DealIDs from PublishStorageDeals") } - out := make([]Piece, len(sizes)) + log.Infof("Deals for sector %d: %+v", sectorID, resp.DealIDs) + out := make([]Piece, len(sizes)) for i, size := range sizes { - ppi, err := m.sb.AddPiece(size, sectorID, m.pledgeReader(size, uint64(runtime.NumCPU())), existingPieceSizes) + ppi, err := m.sb.AddPiece(ctx, size, sectorID, m.pledgeReader(size, uint64(runtime.NumCPU())), existingPieceSizes) if err != nil { - return nil, err + return nil, xerrors.Errorf("add piece: %w", err) } existingPieceSizes = append(existingPieceSizes, size) diff --git a/storage/sealing/sealing.go b/storage/sealing/sealing.go index 6d0c6bb46..6e562001d 100644 --- a/storage/sealing/sealing.go +++ b/storage/sealing/sealing.go @@ -112,7 +112,7 @@ func (m *Sealing) AllocatePiece(size uint64) (sectorID uint64, offset uint64, er func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, sectorID uint64, dealID uint64) error { log.Infof("Seal piece for deal %d", dealID) - ppi, err := m.sb.AddPiece(size, sectorID, r, []uint64{}) + ppi, err := m.sb.AddPiece(ctx, size, sectorID, r, []uint64{}) if err != nil { return xerrors.Errorf("adding piece to sector: %w", err) } @@ -121,6 +121,7 @@ func (m *Sealing) SealPiece(ctx context.Context, size uint64, r io.Reader, secto } func (m *Sealing) newSector(ctx context.Context, sid uint64, dealID uint64, ppi sectorbuilder.PublicPieceInfo) error { + log.Infof("Start sealing %d", sid) return m.sectors.Send(sid, SectorStart{ id: sid, pieces: []Piece{ diff --git a/storage/sealing/states.go b/storage/sealing/states.go index 519245e03..723a2d843 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -4,6 +4,7 @@ import ( "context" sectorbuilder "github.com/filecoin-project/go-sectorbuilder" + "github.com/filecoin-project/go-sectorbuilder/fs" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/build" @@ -232,6 +233,23 @@ func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorProving{}) } +func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error { + // TODO: Maybe wait for some finality + + if err := m.sb.FinalizeSector(ctx.Context(), sector.SectorID); err != nil { + if !xerrors.Is(err, fs.ErrNoSuitablePath) { + return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) + } + log.Warnf("finalize sector: %v", err) + } + + if err := m.sb.DropStaged(ctx.Context(), sector.SectorID); err != nil { + return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("drop staged: %w", err)}) + } + + return ctx.Send(SectorFinalized{}) +} + func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error { // TODO: check if the fault has already been reported, and that this sector is even valid diff --git a/storage/sealing/utils.go b/storage/sealing/utils.go index 21d6b76bf..b2221b278 100644 --- a/storage/sealing/utils.go +++ b/storage/sealing/utils.go @@ -49,8 +49,11 @@ func fillersFromRem(toFill uint64) ([]uint64, error) { func (m *Sealing) fastPledgeCommitment(size uint64, parts uint64) (commP [sectorbuilder.CommLen]byte, err error) { parts = 1 << bits.Len64(parts) // round down to nearest power of 2 + if size/parts < 127 { + parts = size / 127 + } - piece := sectorbuilder.UserBytesForSectorSize((size + size / 127) / parts) + piece := sectorbuilder.UserBytesForSectorSize((size + size/127) / parts) out := make([]sectorbuilder.PublicPieceInfo, parts) var lk sync.Mutex diff --git a/storage/sealing/utils_test.go b/storage/sealing/utils_test.go index 9f9ca3880..94bf858c1 100644 --- a/storage/sealing/utils_test.go +++ b/storage/sealing/utils_test.go @@ -52,4 +52,11 @@ func TestFastPledge(t *testing.T) { if _, err := s.fastPledgeCommitment(sectorbuilder.UserBytesForSectorSize(sz), 5); err != nil { t.Fatalf("%+v", err) } + + sz = uint64(1024) + + s = Sealing{sb: sbmock.NewMockSectorBuilder(0, sz)} + if _, err := s.fastPledgeCommitment(sectorbuilder.UserBytesForSectorSize(sz), 64); err != nil { + t.Fatalf("%+v", err) + } } diff --git a/storage/sectorblocks/blockstore.go b/storage/sectorblocks/blockstore.go index 36394c1c8..63539e2f7 100644 --- a/storage/sectorblocks/blockstore.go +++ b/storage/sectorblocks/blockstore.go @@ -96,6 +96,7 @@ func (s *SectorBlockStore) Get(c cid.Cid) (blocks.Block, error) { log.Infof("reading block %s from sector %d(+%d;%d)", c, best.SectorID, best.Offset, best.Size) r, err := s.sectorBlocks.sb.ReadPieceFromSealedSector( + context.TODO(), best.SectorID, best.Offset, best.Size,