diff --git a/.circleci/config.yml b/.circleci/config.yml index 31245589b..847e9be05 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -917,6 +917,11 @@ workflows: suite: itest-sector_finalize_early target: "./itests/sector_finalize_early_test.go" + - test: + name: test-itest-sector_make_cc_avail + suite: itest-sector_make_cc_avail + target: "./itests/sector_make_cc_avail_test.go" + - test: name: test-itest-sector_miner_collateral suite: itest-sector_miner_collateral @@ -927,6 +932,16 @@ workflows: suite: itest-sector_pledge target: "./itests/sector_pledge_test.go" + - test: + name: test-itest-sector_prefer_no_upgrade + suite: itest-sector_prefer_no_upgrade + target: "./itests/sector_prefer_no_upgrade_test.go" + + - test: + name: test-itest-sector_revert_available + suite: itest-sector_revert_available + target: "./itests/sector_revert_available_test.go" + - test: name: test-itest-sector_terminate suite: itest-sector_terminate diff --git a/api/api_net.go b/api/api_net.go index 74581e3ac..ae53e4c0d 100644 --- a/api/api_net.go +++ b/api/api_net.go @@ -2,6 +2,7 @@ package api import ( "context" + "time" metrics "github.com/libp2p/go-libp2p-core/metrics" "github.com/libp2p/go-libp2p-core/network" @@ -25,6 +26,7 @@ type Net interface { NetConnectedness(context.Context, peer.ID) (network.Connectedness, error) //perm:read NetPeers(context.Context) ([]peer.AddrInfo, error) //perm:read + NetPing(context.Context, peer.ID) (time.Duration, error) //perm:read NetConnect(context.Context, peer.AddrInfo) error //perm:write NetAddrsListen(context.Context) (peer.AddrInfo, error) //perm:read NetDisconnect(context.Context, peer.ID) error //perm:write diff --git a/api/mocks/mock_full.go b/api/mocks/mock_full.go index 1dce7fa9c..d59ed4aba 100644 --- a/api/mocks/mock_full.go +++ b/api/mocks/mock_full.go @@ -8,6 +8,7 @@ import ( context "context" json "encoding/json" reflect "reflect" + time "time" address "github.com/filecoin-project/go-address" bitfield "github.com/filecoin-project/go-bitfield" @@ -1856,6 +1857,21 @@ func (mr *MockFullNodeMockRecorder) NetPeers(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NetPeers", reflect.TypeOf((*MockFullNode)(nil).NetPeers), arg0) } +// NetPing mocks base method. +func (m *MockFullNode) NetPing(arg0 context.Context, arg1 peer.ID) (time.Duration, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NetPing", arg0, arg1) + ret0, _ := ret[0].(time.Duration) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NetPing indicates an expected call of NetPing. +func (mr *MockFullNodeMockRecorder) NetPing(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NetPing", reflect.TypeOf((*MockFullNode)(nil).NetPing), arg0, arg1) +} + // NetProtectAdd mocks base method. func (m *MockFullNode) NetProtectAdd(arg0 context.Context, arg1 []peer.ID) error { m.ctrl.T.Helper() diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 80feedd3e..e26967baf 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -597,6 +597,8 @@ type NetStruct struct { NetPeers func(p0 context.Context) ([]peer.AddrInfo, error) `perm:"read"` + NetPing func(p0 context.Context, p1 peer.ID) (time.Duration, error) `perm:"read"` + NetProtectAdd func(p0 context.Context, p1 []peer.ID) error `perm:"admin"` NetProtectList func(p0 context.Context) ([]peer.ID, error) `perm:"read"` @@ -3712,6 +3714,17 @@ func (s *NetStub) NetPeers(p0 context.Context) ([]peer.AddrInfo, error) { return *new([]peer.AddrInfo), ErrNotSupported } +func (s *NetStruct) NetPing(p0 context.Context, p1 peer.ID) (time.Duration, error) { + if s.Internal.NetPing == nil { + return *new(time.Duration), ErrNotSupported + } + return s.Internal.NetPing(p0, p1) +} + +func (s *NetStub) NetPing(p0 context.Context, p1 peer.ID) (time.Duration, error) { + return *new(time.Duration), ErrNotSupported +} + func (s *NetStruct) NetProtectAdd(p0 context.Context, p1 []peer.ID) error { if s.Internal.NetProtectAdd == nil { return ErrNotSupported diff --git a/api/v0api/gomock_reflect_3555711957/prog.go b/api/v0api/gomock_reflect_3555711957/prog.go new file mode 100644 index 000000000..39ca2319d --- /dev/null +++ b/api/v0api/gomock_reflect_3555711957/prog.go @@ -0,0 +1,64 @@ +package main + +import ( + "encoding/gob" + "flag" + "fmt" + "os" + "path" + "reflect" + + "github.com/golang/mock/mockgen/model" + + pkg_ "github.com/filecoin-project/lotus/api/v0api" +) + +var output = flag.String("output", "", "The output file name, or empty to use stdout.") + +func main() { + flag.Parse() + + its := []struct { + sym string + typ reflect.Type + }{ + + {"FullNode", reflect.TypeOf((*pkg_.FullNode)(nil)).Elem()}, + } + pkg := &model.Package{ + // NOTE: This behaves contrary to documented behaviour if the + // package name is not the final component of the import path. + // The reflect package doesn't expose the package name, though. + Name: path.Base("github.com/filecoin-project/lotus/api/v0api"), + } + + for _, it := range its { + intf, err := model.InterfaceFromInterfaceType(it.typ) + if err != nil { + fmt.Fprintf(os.Stderr, "Reflection: %v\n", err) + os.Exit(1) + } + intf.Name = it.sym + pkg.Interfaces = append(pkg.Interfaces, intf) + } + + outfile := os.Stdout + if len(*output) != 0 { + var err error + outfile, err = os.Create(*output) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to open output file %q", *output) + } + defer func() { + if err := outfile.Close(); err != nil { + fmt.Fprintf(os.Stderr, "failed to close output file %q", *output) + os.Exit(1) + } + }() + } + + if err := gob.NewEncoder(outfile).Encode(pkg); err != nil { + fmt.Fprintf(os.Stderr, "gob encode: %v\n", err) + os.Exit(1) + } +} diff --git a/api/v0api/v0mocks/mock_full.go b/api/v0api/v0mocks/mock_full.go index e18c8bfe7..6cb80d894 100644 --- a/api/v0api/v0mocks/mock_full.go +++ b/api/v0api/v0mocks/mock_full.go @@ -7,6 +7,7 @@ package v0mocks import ( context "context" reflect "reflect" + time "time" address "github.com/filecoin-project/go-address" bitfield "github.com/filecoin-project/go-bitfield" @@ -1769,6 +1770,21 @@ func (mr *MockFullNodeMockRecorder) NetPeers(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NetPeers", reflect.TypeOf((*MockFullNode)(nil).NetPeers), arg0) } +// NetPing mocks base method. +func (m *MockFullNode) NetPing(arg0 context.Context, arg1 peer.ID) (time.Duration, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NetPing", arg0, arg1) + ret0, _ := ret[0].(time.Duration) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NetPing indicates an expected call of NetPing. +func (mr *MockFullNodeMockRecorder) NetPing(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NetPing", reflect.TypeOf((*MockFullNode)(nil).NetPing), arg0, arg1) +} + // NetProtectAdd mocks base method. func (m *MockFullNode) NetProtectAdd(arg0 context.Context, arg1 []peer.ID) error { m.ctrl.T.Helper() diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index eb8970dd7..6be5c148a 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index e67bd4416..6297eb1d6 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 82d78d414..3db89a892 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/chain/stmgr/call.go b/chain/stmgr/call.go index 5db508008..888ca0254 100644 --- a/chain/stmgr/call.go +++ b/chain/stmgr/call.go @@ -5,12 +5,12 @@ import ( "errors" "fmt" + "github.com/filecoin-project/lotus/blockstore" + cbor "github.com/ipfs/go-ipld-cbor" "github.com/filecoin-project/lotus/chain/state" - "github.com/filecoin-project/lotus/blockstore" - "github.com/filecoin-project/lotus/chain/rand" "github.com/filecoin-project/go-address" @@ -223,7 +223,7 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri return nil, err } - buffStore := blockstore.NewBuffered(sm.cs.StateBlockstore()) + buffStore := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync()) vmopt := &vm.VMOpts{ StateBase: stateCid, Epoch: vmHeight, diff --git a/chain/vm/fvm.go b/chain/vm/fvm.go index 922eb77c5..72c84f966 100644 --- a/chain/vm/fvm.go +++ b/chain/vm/fvm.go @@ -217,6 +217,7 @@ type FVM struct { } func NewFVM(ctx context.Context, opts *VMOpts) (*FVM, error) { + log.Info("using the FVM, this is experimental!") circToReport := opts.FilVested // For v14 (and earlier), we perform the FilVested portion of the calculation, and let the FVM dynamically do the rest // v15 and after, the circ supply is always constant per epoch, so we calculate the base and report it at creation diff --git a/cli/multisig.go b/cli/multisig.go index 0179378a7..d9255f6e4 100644 --- a/cli/multisig.go +++ b/cli/multisig.go @@ -161,6 +161,7 @@ var msigCreateCmd = &cli.Command{ msgCid := sm.Cid() fmt.Println("sent create in message: ", msgCid) + fmt.Println("waiting for confirmation..") // wait for it to get mined into a block wait, err := api.StateWaitMsg(ctx, msgCid, uint64(cctx.Int("confidence")), build.Finality, true) diff --git a/cli/net.go b/cli/net.go index 0ff15e38a..c9a9a4392 100644 --- a/cli/net.go +++ b/cli/net.go @@ -1,12 +1,14 @@ package cli import ( + "context" "encoding/json" "fmt" "os" "sort" "strings" "text/tabwriter" + "time" "github.com/dustin/go-humanize" "github.com/urfave/cli/v2" @@ -28,6 +30,7 @@ var NetCmd = &cli.Command{ Usage: "Manage P2P Network", Subcommands: []*cli.Command{ NetPeers, + NetPing, NetConnect, NetListen, NetId, @@ -117,6 +120,82 @@ var NetPeers = &cli.Command{ }, } +var NetPing = &cli.Command{ + Name: "ping", + Usage: "Ping peers", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "count", + Value: 10, + Aliases: []string{"c"}, + Usage: "specify the number of times it should ping", + }, + &cli.DurationFlag{ + Name: "interval", + Value: time.Second, + Aliases: []string{"i"}, + Usage: "minimum time between pings", + }, + }, + Action: func(cctx *cli.Context) error { + if cctx.Args().Len() != 1 { + return xerrors.Errorf("please provide a peerID") + } + + api, closer, err := GetAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := ReqContext(cctx) + + pis, err := addrInfoFromArg(ctx, cctx) + if err != nil { + return err + } + + count := cctx.Int("count") + interval := cctx.Duration("interval") + + for _, pi := range pis { + err := api.NetConnect(ctx, pi) + if err != nil { + return xerrors.Errorf("connect: %w", err) + } + + fmt.Printf("PING %s\n", pi.ID) + var avg time.Duration + var successful int + + for i := 0; i < count && ctx.Err() == nil; i++ { + start := time.Now() + + rtt, err := api.NetPing(ctx, pi.ID) + if err != nil { + if ctx.Err() != nil { + break + } + log.Errorf("Ping failed: error=%v", err) + continue + } + fmt.Printf("Pong received: time=%v\n", rtt) + avg = avg + rtt + successful++ + + wctx, cancel := context.WithTimeout(ctx, time.Until(start.Add(interval))) + <-wctx.Done() + cancel() + } + + if successful > 0 { + fmt.Printf("Average latency: %v\n", avg/time.Duration(successful)) + } + } + return nil + }, +} + var NetScores = &cli.Command{ Name: "scores", Usage: "Print peers' pubsub scores", @@ -192,45 +271,9 @@ var NetConnect = &cli.Command{ defer closer() ctx := ReqContext(cctx) - pis, err := addrutil.ParseAddresses(ctx, cctx.Args().Slice()) + pis, err := addrInfoFromArg(ctx, cctx) if err != nil { - a, perr := address.NewFromString(cctx.Args().First()) - if perr != nil { - return err - } - - na, fc, err := GetFullNodeAPI(cctx) - if err != nil { - return err - } - defer fc() - - mi, err := na.StateMinerInfo(ctx, a, types.EmptyTSK) - if err != nil { - return xerrors.Errorf("getting miner info: %w", err) - } - - if mi.PeerId == nil { - return xerrors.Errorf("no PeerID for miner") - } - multiaddrs := make([]multiaddr.Multiaddr, 0, len(mi.Multiaddrs)) - for i, a := range mi.Multiaddrs { - maddr, err := multiaddr.NewMultiaddrBytes(a) - if err != nil { - log.Warnf("parsing multiaddr %d (%x): %s", i, a, err) - continue - } - multiaddrs = append(multiaddrs, maddr) - } - - pi := peer.AddrInfo{ - ID: *mi.PeerId, - Addrs: multiaddrs, - } - - fmt.Printf("%s -> %s\n", a, pi) - - pis = append(pis, pi) + return err } for _, pi := range pis { @@ -247,6 +290,51 @@ var NetConnect = &cli.Command{ }, } +func addrInfoFromArg(ctx context.Context, cctx *cli.Context) ([]peer.AddrInfo, error) { + pis, err := addrutil.ParseAddresses(ctx, cctx.Args().Slice()) + if err != nil { + a, perr := address.NewFromString(cctx.Args().First()) + if perr != nil { + return nil, err + } + + na, fc, err := GetFullNodeAPI(cctx) + if err != nil { + return nil, err + } + defer fc() + + mi, err := na.StateMinerInfo(ctx, a, types.EmptyTSK) + if err != nil { + return nil, xerrors.Errorf("getting miner info: %w", err) + } + + if mi.PeerId == nil { + return nil, xerrors.Errorf("no PeerID for miner") + } + multiaddrs := make([]multiaddr.Multiaddr, 0, len(mi.Multiaddrs)) + for i, a := range mi.Multiaddrs { + maddr, err := multiaddr.NewMultiaddrBytes(a) + if err != nil { + log.Warnf("parsing multiaddr %d (%x): %s", i, a, err) + continue + } + multiaddrs = append(multiaddrs, maddr) + } + + pi := peer.AddrInfo{ + ID: *mi.PeerId, + Addrs: multiaddrs, + } + + fmt.Printf("%s -> %s\n", a, pi) + + pis = append(pis, pi) + } + + return pis, err +} + var NetId = &cli.Command{ Name: "id", Usage: "Get node identity", diff --git a/cmd/lotus-miner/market.go b/cmd/lotus-miner/market.go index c7089e74e..2974078d6 100644 --- a/cmd/lotus-miner/market.go +++ b/cmd/lotus-miner/market.go @@ -212,6 +212,7 @@ var setAskCmd = &cli.Command{ Name: "max-piece-size", Usage: "Set maximum piece size (w/bit-padding, in bytes) in ask to `SIZE`", DefaultText: "miner sector size", + Value: "0", }, }, Action: func(cctx *cli.Context) error { diff --git a/cmd/lotus-miner/proving.go b/cmd/lotus-miner/proving.go index 77e0e747a..7936f426b 100644 --- a/cmd/lotus-miner/proving.go +++ b/cmd/lotus-miner/proving.go @@ -365,6 +365,10 @@ var provingCheckProvableCmd = &cli.Command{ Name: "storage-id", Usage: "filter sectors by storage path (path id)", }, + &cli.BoolFlag{ + Name: "faulty", + Usage: "only check faulty sectors", + }, }, Action: func(cctx *cli.Context) error { if cctx.Args().Len() != 1 { @@ -376,7 +380,7 @@ var provingCheckProvableCmd = &cli.Command{ return xerrors.Errorf("could not parse deadline index: %w", err) } - api, closer, err := lcli.GetFullNodeAPI(cctx) + api, closer, err := lcli.GetFullNodeAPIV1(cctx) if err != nil { return err } @@ -428,6 +432,38 @@ var provingCheckProvableCmd = &cli.Command{ } } + if cctx.Bool("faulty") { + parts, err := getAllPartitions(ctx, addr, api) + if err != nil { + return xerrors.Errorf("getting partitions: %w", err) + } + + if filter != nil { + for k := range filter { + set, err := parts.FaultySectors.IsSet(uint64(k.Number)) + if err != nil { + return err + } + if !set { + delete(filter, k) + } + } + } else { + filter = map[abi.SectorID]struct{}{} + + err = parts.FaultySectors.ForEach(func(s uint64) error { + filter[abi.SectorID{ + Miner: abi.ActorID(mid), + Number: abi.SectorNumber(s), + }] = struct{}{} + return nil + }) + if err != nil { + return err + } + } + } + for parIdx, par := range partitions { sectors := make(map[abi.SectorNumber]struct{}) diff --git a/cmd/lotus-miner/storage.go b/cmd/lotus-miner/storage.go index 91c6ea9db..6d3ced35c 100644 --- a/cmd/lotus-miner/storage.go +++ b/cmd/lotus-miner/storage.go @@ -22,6 +22,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" @@ -557,7 +558,7 @@ var storageListSectorsCmd = &cli.Command{ } defer closer() - napi, closer2, err := lcli.GetFullNodeAPI(cctx) + napi, closer2, err := lcli.GetFullNodeAPIV1(cctx) if err != nil { return err } @@ -592,6 +593,11 @@ var storageListSectorsCmd = &cli.Command{ } } + allParts, err := getAllPartitions(ctx, maddr, napi) + if err != nil { + return xerrors.Errorf("getting partition states: %w", err) + } + type entry struct { id abi.SectorNumber storage storiface.ID @@ -601,6 +607,8 @@ var storageListSectorsCmd = &cli.Command{ primary, copy, main, seal, store bool state api.SectorState + + faulty bool } var list []entry @@ -610,6 +618,10 @@ var storageListSectorsCmd = &cli.Command{ if err != nil { return xerrors.Errorf("getting sector status for sector %d: %w", sector, err) } + fault, err := allParts.FaultySectors.IsSet(uint64(sector)) + if err != nil { + return xerrors.Errorf("checking if sector is faulty: %w", err) + } for _, ft := range storiface.PathTypes { si, err := nodeApi.StorageFindSector(ctx, sid(sector), ft, mi.SectorSize, false) @@ -632,7 +644,8 @@ var storageListSectorsCmd = &cli.Command{ seal: info.CanSeal, store: info.CanStore, - state: st.State, + state: st.State, + faulty: fault, }) } } @@ -660,6 +673,7 @@ var storageListSectorsCmd = &cli.Command{ tablewriter.Col("Sector"), tablewriter.Col("Type"), tablewriter.Col("State"), + tablewriter.Col("Faulty"), tablewriter.Col("Primary"), tablewriter.Col("Path use"), tablewriter.Col("URLs"), @@ -687,6 +701,10 @@ var storageListSectorsCmd = &cli.Command{ "Path use": maybeStr(e.seal, color.FgMagenta, "seal ") + maybeStr(e.store, color.FgCyan, "store"), "URLs": e.urls, } + if e.faulty { + // only set when there is a fault, so the column is hidden with no faults + m["Faulty"] = color.RedString("faulty") + } tw.Write(m) } @@ -694,6 +712,52 @@ var storageListSectorsCmd = &cli.Command{ }, } +func getAllPartitions(ctx context.Context, maddr address.Address, napi api.FullNode) (api.Partition, error) { + deadlines, err := napi.StateMinerDeadlines(ctx, maddr, types.EmptyTSK) + if err != nil { + return api.Partition{}, xerrors.Errorf("getting deadlines: %w", err) + } + + out := api.Partition{ + AllSectors: bitfield.New(), + FaultySectors: bitfield.New(), + RecoveringSectors: bitfield.New(), + LiveSectors: bitfield.New(), + ActiveSectors: bitfield.New(), + } + + for dlIdx := range deadlines { + partitions, err := napi.StateMinerPartitions(ctx, maddr, uint64(dlIdx), types.EmptyTSK) + if err != nil { + return api.Partition{}, xerrors.Errorf("getting partitions for deadline %d: %w", dlIdx, err) + } + + for _, partition := range partitions { + out.AllSectors, err = bitfield.MergeBitFields(out.AllSectors, partition.AllSectors) + if err != nil { + return api.Partition{}, err + } + out.FaultySectors, err = bitfield.MergeBitFields(out.FaultySectors, partition.FaultySectors) + if err != nil { + return api.Partition{}, err + } + out.RecoveringSectors, err = bitfield.MergeBitFields(out.RecoveringSectors, partition.RecoveringSectors) + if err != nil { + return api.Partition{}, err + } + out.LiveSectors, err = bitfield.MergeBitFields(out.LiveSectors, partition.LiveSectors) + if err != nil { + return api.Partition{}, err + } + out.ActiveSectors, err = bitfield.MergeBitFields(out.ActiveSectors, partition.ActiveSectors) + if err != nil { + return api.Partition{}, err + } + } + } + return out, nil +} + func maybeStr(c bool, col color.Attribute, s string) string { if !c { return "" diff --git a/cmd/lotus-wallet/main.go b/cmd/lotus-wallet/main.go index 91f23d092..045e55327 100644 --- a/cmd/lotus-wallet/main.go +++ b/cmd/lotus-wallet/main.go @@ -278,7 +278,7 @@ func openRepo(cctx *cli.Context) (repo.LockedRepo, types.KeyStore, error) { return nil, nil, err } if !ok { - if err := r.Init(repo.Worker); err != nil { + if err := r.Init(repo.Wallet); err != nil { return nil, nil, err } } diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index d46236af3..c176203a5 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -88,6 +88,7 @@ * [NetLimit](#NetLimit) * [NetPeerInfo](#NetPeerInfo) * [NetPeers](#NetPeers) + * [NetPing](#NetPing) * [NetProtectAdd](#NetProtectAdd) * [NetProtectList](#NetProtectList) * [NetProtectRemove](#NetProtectRemove) @@ -1848,6 +1849,20 @@ Response: ] ``` +### NetPing + + +Perms: read + +Inputs: +```json +[ + "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf" +] +``` + +Response: `60000000000` + ### NetProtectAdd diff --git a/documentation/en/api-v0-methods.md b/documentation/en/api-v0-methods.md index eb195df8a..6973a6651 100644 --- a/documentation/en/api-v0-methods.md +++ b/documentation/en/api-v0-methods.md @@ -131,6 +131,7 @@ * [NetLimit](#NetLimit) * [NetPeerInfo](#NetPeerInfo) * [NetPeers](#NetPeers) + * [NetPing](#NetPing) * [NetProtectAdd](#NetProtectAdd) * [NetProtectList](#NetProtectList) * [NetProtectRemove](#NetProtectRemove) @@ -3908,6 +3909,20 @@ Response: ] ``` +### NetPing + + +Perms: read + +Inputs: +```json +[ + "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf" +] +``` + +Response: `60000000000` + ### NetProtectAdd diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 48b4540fe..d2beb9f08 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -137,6 +137,7 @@ * [NetLimit](#NetLimit) * [NetPeerInfo](#NetPeerInfo) * [NetPeers](#NetPeers) + * [NetPing](#NetPing) * [NetProtectAdd](#NetProtectAdd) * [NetProtectList](#NetProtectList) * [NetProtectRemove](#NetProtectRemove) @@ -4270,6 +4271,20 @@ Response: ] ``` +### NetPing + + +Perms: read + +Inputs: +```json +[ + "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf" +] +``` + +Response: `60000000000` + ### NetProtectAdd diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index 6e7bd8eff..3ec75b251 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -1200,6 +1200,7 @@ USAGE: COMMANDS: peers Print peers + ping Ping peers connect Connect to a peer listen List listen addresses id Get node identity @@ -1235,6 +1236,21 @@ OPTIONS: ``` +### lotus-miner net ping +``` +NAME: + lotus-miner net ping - Ping peers + +USAGE: + lotus-miner net ping [command options] [arguments...] + +OPTIONS: + --count value, -c value specify the number of times it should ping (default: 10) + --interval value, -i value minimum time between pings (default: 1s) + --help, -h show help (default: false) + +``` + ### lotus-miner net connect ``` NAME: @@ -2095,6 +2111,7 @@ OPTIONS: --only-bad print only bad sectors (default: false) --slow run slower checks (default: false) --storage-id value filter sectors by storage path (path id) + --faulty only check faulty sectors (default: false) --help, -h show help (default: false) ``` diff --git a/documentation/en/cli-lotus.md b/documentation/en/cli-lotus.md index a08f01039..a9c729930 100644 --- a/documentation/en/cli-lotus.md +++ b/documentation/en/cli-lotus.md @@ -2602,6 +2602,7 @@ USAGE: COMMANDS: peers Print peers + ping Ping peers connect Connect to a peer listen List listen addresses id Get node identity @@ -2637,6 +2638,21 @@ OPTIONS: ``` +### lotus net ping +``` +NAME: + lotus net ping - Ping peers + +USAGE: + lotus net ping [command options] [arguments...] + +OPTIONS: + --count value, -c value specify the number of times it should ping (default: 10) + --interval value, -i value minimum time between pings (default: 1s) + --help, -h show help (default: false) + +``` + ### lotus net connect ``` NAME: diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index de45be748..634da3b9d 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -325,18 +325,33 @@ # env var: LOTUS_SEALING_MAXWAITDEALSSECTORS #MaxWaitDealsSectors = 2 - # Upper bound on how many sectors can be sealing at the same time when creating new CC sectors (0 = unlimited) + # Upper bound on how many sectors can be sealing+upgrading at the same time when creating new CC sectors (0 = unlimited) # # type: uint64 # env var: LOTUS_SEALING_MAXSEALINGSECTORS #MaxSealingSectors = 0 - # Upper bound on how many sectors can be sealing at the same time when creating new sectors with deals (0 = unlimited) + # Upper bound on how many sectors can be sealing+upgrading at the same time when creating new sectors with deals (0 = unlimited) # # type: uint64 # env var: LOTUS_SEALING_MAXSEALINGSECTORSFORDEALS #MaxSealingSectorsForDeals = 0 + # Prefer creating new sectors even if there are sectors Available for upgrading. + # This setting combined with MaxUpgradingSectors set to a value higher than MaxSealingSectorsForDeals makes it + # possible to use fast sector upgrades to handle high volumes of storage deals, while still using the simple sealing + # flow when the volume of storage deals is lower. + # + # type: bool + # env var: LOTUS_SEALING_PREFERNEWSECTORSFORDEALS + #PreferNewSectorsForDeals = false + + # Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals) + # + # type: uint64 + # env var: LOTUS_SEALING_MAXUPGRADINGSECTORS + #MaxUpgradingSectors = 0 + # CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will # live before it must be extended or converted into sector containing deals before it is # terminated. Value must be between 180-540 days inclusive diff --git a/extern/sector-storage/piece_provider.go b/extern/sector-storage/piece_provider.go index 72e09df06..32f0c9028 100644 --- a/extern/sector-storage/piece_provider.go +++ b/extern/sector-storage/piece_provider.go @@ -166,8 +166,6 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size) - log.Debugf("result of first tryReadUnsealedPiece: r=%s, err=%s", r, err) - if xerrors.Is(err, storiface.ErrSectorNotFound) { log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size) err = nil diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index d2b51edc9..de3ae9f2a 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -473,6 +473,8 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e return matches[i].sector.Number < matches[j].sector.Number // prefer older sectors }) + log.Debugw("updateInput matching", "matches", len(matches), "toAssign", len(toAssign), "openSectors", len(m.openSectors), "pieces", len(m.pendingPieces)) + var assigned int for _, mt := range matches { if m.pendingPieces[mt.deal].assigned { @@ -506,6 +508,8 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e } } + log.Debugw("updateInput matching done", "matches", len(matches), "toAssign", len(toAssign), "assigned", assigned, "openSectors", len(m.openSectors), "pieces", len(m.pendingPieces)) + if len(toAssign) > 0 { log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors) if err := m.tryGetDealSector(ctx, sp, expF); err != nil { @@ -550,7 +554,7 @@ func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize return curEpoch + minDur, curEpoch + maxDur, nil } -func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) { +func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) { if len(m.available) == 0 { return false, nil } @@ -619,56 +623,8 @@ func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSeal return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{}) } -func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error { - m.startupWait.Wait() - - if m.nextDealSector != nil { - return nil // new sector is being created right now - } - - cfg, err := m.getConfig() - if err != nil { - return xerrors.Errorf("getting storage config: %w", err) - } - - if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals { - return nil - } - - if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors { - return nil - } - - got, err := m.tryGetUpgradeSector(ctx, sp, ef) - if err != nil { - return err - } - if got { - return nil - } - - if !cfg.MakeNewSectorForDeals { - return nil - } - - sid, err := m.createSector(ctx, cfg, sp) - if err != nil { - return err - } - - m.nextDealSector = &sid - - log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) - return m.sectors.Send(uint64(sid), SectorStart{ - ID: sid, - SectorType: sp, - }) -} - // call with m.inputLk func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) { - // Now actually create a new sector - sid, err := m.sc.Next() if err != nil { return 0, xerrors.Errorf("getting sector number: %w", err) @@ -682,7 +638,74 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi // update stats early, fsm planner would do that async m.stats.updateSector(ctx, cfg, m.minerSectorID(sid), UndefinedSectorState) - return sid, nil + return sid, err +} + +func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error { + m.startupWait.Wait() + + if m.nextDealSector != nil { + return nil // new sector is being created right now + } + + cfg, err := m.getConfig() + if err != nil { + return xerrors.Errorf("getting storage config: %w", err) + } + + // if we're above WaitDeals limit, we don't want to add more staging sectors + if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors { + return nil + } + + maxUpgrading := cfg.MaxSealingSectorsForDeals + if cfg.MaxUpgradingSectors > 0 { + maxUpgrading = cfg.MaxUpgradingSectors + } + + canCreate := cfg.MakeNewSectorForDeals && !(cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals) + canUpgrade := !(maxUpgrading > 0 && m.stats.curSealing() >= maxUpgrading) + + // we want to try to upgrade when: + // - we can upgrade and prefer upgrades + // - we don't prefer upgrades, but can't create a new sector + shouldUpgrade := canUpgrade && (!cfg.PreferNewSectorsForDeals || !canCreate) + + log.Infow("new deal sector decision", + "sealing", m.stats.curSealing(), + "maxSeal", cfg.MaxSealingSectorsForDeals, + "maxUpgrade", maxUpgrading, + "preferNew", cfg.PreferNewSectorsForDeals, + "canCreate", canCreate, + "canUpgrade", canUpgrade, + "shouldUpgrade", shouldUpgrade) + + if shouldUpgrade { + got, err := m.maybeUpgradeSector(ctx, sp, ef) + if err != nil { + return err + } + if got { + return nil + } + } + + if canCreate { + sid, err := m.createSector(ctx, cfg, sp) + if err != nil { + return err + } + m.nextDealSector = &sid + + log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) + if err := m.sectors.Send(uint64(sid), SectorStart{ + ID: sid, + SectorType: sp, + }); err != nil { + return err + } + } + return nil } func (m *Sealing) StartPacking(sid abi.SectorNumber) error { diff --git a/extern/storage-sealing/sealiface/config.go b/extern/storage-sealing/sealiface/config.go index 20bd2b564..0470db38e 100644 --- a/extern/storage-sealing/sealiface/config.go +++ b/extern/storage-sealing/sealiface/config.go @@ -18,6 +18,10 @@ type Config struct { // includes failed, 0 = no limit MaxSealingSectorsForDeals uint64 + PreferNewSectorsForDeals bool + + MaxUpgradingSectors uint64 + MakeNewSectorForDeals bool MakeCCSectorsAvailable bool diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index 90fa5090a..dedba2fc6 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -184,14 +184,14 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect } func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sector SectorInfo) error { + if err := failedCooldown(ctx, sector); err != nil { + return err + } + if sector.ReplicaUpdateMessage != nil { mw, err := m.Api.StateSearchMsg(ctx.Context(), *sector.ReplicaUpdateMessage) if err != nil { // API error - if err := failedCooldown(ctx, sector); err != nil { - return err - } - return ctx.Send(SectorRetrySubmitReplicaUpdateWait{}) } @@ -248,10 +248,6 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect return ctx.Send(SectorAbortUpgrade{}) } - if err := failedCooldown(ctx, sector); err != nil { - return err - } - return ctx.Send(SectorRetrySubmitReplicaUpdate{}) } diff --git a/extern/storage-sealing/upgrade_queue.go b/extern/storage-sealing/upgrade_queue.go index b6fd6e173..1ee228ccb 100644 --- a/extern/storage-sealing/upgrade_queue.go +++ b/extern/storage-sealing/upgrade_queue.go @@ -16,7 +16,7 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e } if si.State != Proving { - return xerrors.Errorf("can't mark sectors not in the 'Proving' state for upgrade") + return xerrors.Errorf("unable to snap-up sectors not in the 'Proving' state") } if si.hasDeals() { diff --git a/itests/ccupgrade_test.go b/itests/ccupgrade_test.go index 98b5e0761..cd876a3fd 100644 --- a/itests/ccupgrade_test.go +++ b/itests/ccupgrade_test.go @@ -7,18 +7,13 @@ import ( "testing" "time" - logging "github.com/ipfs/go-log/v2" - - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/network" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" - sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/itests/kit" ) @@ -34,7 +29,23 @@ func TestCCUpgrade(t *testing.T) { //stm: @MINER_SECTOR_LIST_001 kit.QuietMiningLogs() - runTestCCUpgrade(t) + n := runTestCCUpgrade(t) + + t.Run("post", func(t *testing.T) { + ctx := context.Background() + ts, err := n.ChainHead(ctx) + require.NoError(t, err) + start := ts.Height() + // wait for a full proving period + t.Log("waiting for chain") + + n.WaitTillChain(ctx, func(ts *types.TipSet) bool { + if ts.Height() > start+abi.ChainEpoch(2880) { + return true + } + return false + }) + }) } func runTestCCUpgrade(t *testing.T) *kit.TestFullNode { @@ -62,7 +73,7 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode { require.NoError(t, err) require.Less(t, 50000, int(si.Expiration)) } - waitForSectorActive(ctx, t, CCUpgrade, client, maddr) + client.WaitForSectorActive(ctx, t, CCUpgrade, maddr) //stm: @SECTOR_CC_UPGRADE_001 err = miner.SectorMarkForUpgrade(ctx, sl[0], true) @@ -90,105 +101,3 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode { return client } - -func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, node *kit.TestFullNode, maddr address.Address) { - for { - active, err := node.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK) - require.NoError(t, err) - for _, si := range active { - if si.SectorNumber == sn { - fmt.Printf("ACTIVE\n") - return - } - } - - time.Sleep(time.Second) - } -} - -func TestCCUpgradeAndPoSt(t *testing.T) { - kit.QuietMiningLogs() - _ = logging.SetLogLevel("storageminer", "INFO") - t.Run("upgrade and then post", func(t *testing.T) { - ctx := context.Background() - n := runTestCCUpgrade(t) - ts, err := n.ChainHead(ctx) - require.NoError(t, err) - start := ts.Height() - // wait for a full proving period - t.Log("waiting for chain") - - n.WaitTillChain(ctx, func(ts *types.TipSet) bool { - if ts.Height() > start+abi.ChainEpoch(2880) { - return true - } - return false - }) - }) -} - -func TestAbortUpgradeAvailable(t *testing.T) { - kit.QuietMiningLogs() - - ctx := context.Background() - blockTime := 1 * time.Millisecond - - client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC()) - ens.InterconnectAll().BeginMiningMustPost(blockTime) - - maddr, err := miner.ActorAddress(ctx) - if err != nil { - t.Fatal(err) - } - - CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) - fmt.Printf("CCUpgrade: %d\n", CCUpgrade) - - miner.PledgeSectors(ctx, 1, 0, nil) - sl, err := miner.SectorsList(ctx) - require.NoError(t, err) - require.Len(t, sl, 1, "expected 1 sector") - require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") - { - si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK) - require.NoError(t, err) - require.Less(t, 50000, int(si.Expiration)) - } - waitForSectorActive(ctx, t, CCUpgrade, client, maddr) - - err = miner.SectorMarkForUpgrade(ctx, sl[0], true) - require.NoError(t, err) - - sl, err = miner.SectorsList(ctx) - require.NoError(t, err) - require.Len(t, sl, 1, "expected 1 sector") - - ss, err := miner.SectorsStatus(ctx, sl[0], false) - require.NoError(t, err) - - for i := 0; i < 100; i++ { - ss, err = miner.SectorsStatus(ctx, sl[0], false) - require.NoError(t, err) - if ss.State == api.SectorState(sealing.Proving) { - time.Sleep(50 * time.Millisecond) - continue - } - - require.Equal(t, api.SectorState(sealing.Available), ss.State) - break - } - - require.NoError(t, miner.SectorAbortUpgrade(ctx, sl[0])) - - for i := 0; i < 100; i++ { - ss, err = miner.SectorsStatus(ctx, sl[0], false) - require.NoError(t, err) - if ss.State == api.SectorState(sealing.Available) { - time.Sleep(50 * time.Millisecond) - continue - } - - require.Equal(t, api.SectorState(sealing.Proving), ss.State) - break - } -} diff --git a/itests/kit/node_full.go b/itests/kit/node_full.go index b606db8f4..1714e01e0 100644 --- a/itests/kit/node_full.go +++ b/itests/kit/node_full.go @@ -2,7 +2,9 @@ package kit import ( "context" + "fmt" "testing" + "time" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" @@ -67,6 +69,21 @@ func (f *TestFullNode) WaitTillChain(ctx context.Context, pred ChainPredicate) * return nil } +func (f *TestFullNode) WaitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, maddr address.Address) { + for { + active, err := f.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + for _, si := range active { + if si.SectorNumber == sn { + fmt.Printf("ACTIVE\n") + return + } + } + + time.Sleep(time.Second) + } +} + // ChainPredicate encapsulates a chain condition. type ChainPredicate func(set *types.TipSet) bool diff --git a/itests/kit/node_miner.go b/itests/kit/node_miner.go index 3ce89c034..ae77abe5a 100644 --- a/itests/kit/node_miner.go +++ b/itests/kit/node_miner.go @@ -102,7 +102,7 @@ func (tm *TestMiner) WaitSectorsProving(ctx context.Context, toCheck map[abi.Sec st, err := tm.StorageMiner.SectorsStatus(ctx, n, false) require.NoError(tm.t, err) states[st.State]++ - if st.State == api.SectorState(sealing.Proving) { + if st.State == api.SectorState(sealing.Proving) || st.State == api.SectorState(sealing.Available) { delete(toCheck, n) } if strings.Contains(string(st.State), "Fail") { diff --git a/itests/kit/node_opts.go b/itests/kit/node_opts.go index 200f44af0..3fbacabcb 100644 --- a/itests/kit/node_opts.go +++ b/itests/kit/node_opts.go @@ -5,6 +5,11 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/stores" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" @@ -160,6 +165,17 @@ func ConstructorOpts(extra ...node.Option) NodeOpt { } } +func MutateSealingConfig(mut func(sc *config.SealingConfig)) NodeOpt { + return ConstructorOpts( + node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { + return func() (sealiface.Config, error) { + cf := config.DefaultStorageMiner() + mut(&cf.Sealing) + return modules.ToSealingConfig(cf.Dealmaking, cf.Sealing), nil + }, nil + }))) +} + // SectorSize sets the sector size for this miner. Start() will populate the // corresponding proof type depending on the network version (genesis network // version if the Ensemble is unstarted, or the current network version diff --git a/itests/sector_finalize_early_test.go b/itests/sector_finalize_early_test.go index 0f0fcdec6..7974870b6 100644 --- a/itests/sector_finalize_early_test.go +++ b/itests/sector_finalize_early_test.go @@ -9,13 +9,8 @@ import ( "github.com/stretchr/testify/require" - "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/itests/kit" - "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/node/modules" - "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/node/repo" ) func TestDealsWithFinalizeEarly(t *testing.T) { @@ -34,14 +29,7 @@ func TestDealsWithFinalizeEarly(t *testing.T) { var blockTime = 50 * time.Millisecond - client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts( - node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { - return func() (sealiface.Config, error) { - cf := config.DefaultStorageMiner() - cf.Sealing.FinalizeEarly = true - return modules.ToSealingConfig(cf.Dealmaking, cf.Sealing), nil - }, nil - })))) // no mock proofs. + client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MutateSealingConfig(func(sc *config.SealingConfig) { sc.FinalizeEarly = true })) // no mock proofs. ens.InterconnectAll().BeginMining(blockTime) dh := kit.NewDealHarness(t, client, miner, miner) diff --git a/itests/sector_make_cc_avail_test.go b/itests/sector_make_cc_avail_test.go new file mode 100644 index 000000000..094367e96 --- /dev/null +++ b/itests/sector_make_cc_avail_test.go @@ -0,0 +1,77 @@ +package itests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/network" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/node/config" +) + +func TestMakeAvailable(t *testing.T) { + kit.QuietMiningLogs() + + ctx := context.Background() + blockTime := 1 * time.Millisecond + + client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC(), kit.MutateSealingConfig(func(sc *config.SealingConfig) { + sc.MakeCCSectorsAvailable = true + })) + ens.InterconnectAll().BeginMiningMustPost(blockTime) + + maddr, err := miner.ActorAddress(ctx) + if err != nil { + t.Fatal(err) + } + + CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) + + miner.PledgeSectors(ctx, 1, 0, nil) + sl, err := miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") + { + si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK) + require.NoError(t, err) + require.Less(t, 50000, int(si.Expiration)) + } + client.WaitForSectorActive(ctx, t, CCUpgrade, maddr) + + sl, err = miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + + status, err := miner.SectorsStatus(ctx, CCUpgrade, true) + require.NoError(t, err) + assert.Equal(t, api.SectorState(sealing.Available), status.State) + + dh := kit.NewDealHarness(t, client, miner, miner) + deal, res, inPath := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{ + Rseed: 6, + SuspendUntilCryptoeconStable: true, + }) + outPath := dh.PerformRetrieval(context.Background(), deal, res.Root, false) + kit.AssertFilesEqual(t, inPath, outPath) + + sl, err = miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + + status, err = miner.SectorsStatus(ctx, CCUpgrade, true) + require.NoError(t, err) + assert.Equal(t, 1, len(status.Deals)) + miner.WaitSectorsProving(ctx, map[abi.SectorNumber]struct{}{ + CCUpgrade: {}, + }) +} diff --git a/itests/sector_prefer_no_upgrade_test.go b/itests/sector_prefer_no_upgrade_test.go new file mode 100644 index 000000000..11fd2c1de --- /dev/null +++ b/itests/sector_prefer_no_upgrade_test.go @@ -0,0 +1,89 @@ +package itests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/network" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/node/config" +) + +func TestPreferNoUpgrade(t *testing.T) { + kit.QuietMiningLogs() + + ctx := context.Background() + blockTime := 1 * time.Millisecond + + client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC(), kit.MutateSealingConfig(func(sc *config.SealingConfig) { + sc.PreferNewSectorsForDeals = true + })) + ens.InterconnectAll().BeginMiningMustPost(blockTime) + + maddr, err := miner.ActorAddress(ctx) + if err != nil { + t.Fatal(err) + } + + CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) + Sealed := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 2) + + { + miner.PledgeSectors(ctx, 1, 0, nil) + sl, err := miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") + { + si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK) + require.NoError(t, err) + require.Less(t, 50000, int(si.Expiration)) + } + client.WaitForSectorActive(ctx, t, CCUpgrade, maddr) + + err = miner.SectorMarkForUpgrade(ctx, sl[0], true) + require.NoError(t, err) + + sl, err = miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + } + + { + dh := kit.NewDealHarness(t, client, miner, miner) + deal, res, inPath := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{ + Rseed: 6, + SuspendUntilCryptoeconStable: true, + }) + outPath := dh.PerformRetrieval(context.Background(), deal, res.Root, false) + kit.AssertFilesEqual(t, inPath, outPath) + } + + sl, err := miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 2, "expected 2 sectors") + + { + status, err := miner.SectorsStatus(ctx, CCUpgrade, true) + require.NoError(t, err) + assert.Equal(t, api.SectorState(sealing.Available), status.State) + } + + { + status, err := miner.SectorsStatus(ctx, Sealed, true) + require.NoError(t, err) + assert.Equal(t, 1, len(status.Deals)) + miner.WaitSectorsProving(ctx, map[abi.SectorNumber]struct{}{ + Sealed: {}, + }) + } +} diff --git a/itests/sector_revert_available_test.go b/itests/sector_revert_available_test.go new file mode 100644 index 000000000..6827a85fa --- /dev/null +++ b/itests/sector_revert_available_test.go @@ -0,0 +1,84 @@ +package itests + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/network" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/itests/kit" +) + +func TestAbortUpgradeAvailable(t *testing.T) { + kit.QuietMiningLogs() + + ctx := context.Background() + blockTime := 1 * time.Millisecond + + client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC()) + ens.InterconnectAll().BeginMiningMustPost(blockTime) + + maddr, err := miner.ActorAddress(ctx) + if err != nil { + t.Fatal(err) + } + + CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) + fmt.Printf("CCUpgrade: %d\n", CCUpgrade) + + miner.PledgeSectors(ctx, 1, 0, nil) + sl, err := miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") + { + si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK) + require.NoError(t, err) + require.Less(t, 50000, int(si.Expiration)) + } + client.WaitForSectorActive(ctx, t, CCUpgrade, maddr) + + err = miner.SectorMarkForUpgrade(ctx, sl[0], true) + require.NoError(t, err) + + sl, err = miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + + ss, err := miner.SectorsStatus(ctx, sl[0], false) + require.NoError(t, err) + + for i := 0; i < 100; i++ { + ss, err = miner.SectorsStatus(ctx, sl[0], false) + require.NoError(t, err) + if ss.State == api.SectorState(sealing.Proving) { + time.Sleep(50 * time.Millisecond) + continue + } + + require.Equal(t, api.SectorState(sealing.Available), ss.State) + break + } + + require.NoError(t, miner.SectorAbortUpgrade(ctx, sl[0])) + + for i := 0; i < 100; i++ { + ss, err = miner.SectorsStatus(ctx, sl[0], false) + require.NoError(t, err) + if ss.State == api.SectorState(sealing.Available) { + time.Sleep(50 * time.Millisecond) + continue + } + + require.Equal(t, api.SectorState(sealing.Proving), ss.State) + break + } +} diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 972c196f7..ba5ffcc03 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -714,13 +714,28 @@ Note that setting this number too high in relation to deal ingestion rate may re Name: "MaxSealingSectors", Type: "uint64", - Comment: `Upper bound on how many sectors can be sealing at the same time when creating new CC sectors (0 = unlimited)`, + Comment: `Upper bound on how many sectors can be sealing+upgrading at the same time when creating new CC sectors (0 = unlimited)`, }, { Name: "MaxSealingSectorsForDeals", Type: "uint64", - Comment: `Upper bound on how many sectors can be sealing at the same time when creating new sectors with deals (0 = unlimited)`, + Comment: `Upper bound on how many sectors can be sealing+upgrading at the same time when creating new sectors with deals (0 = unlimited)`, + }, + { + Name: "PreferNewSectorsForDeals", + Type: "bool", + + Comment: `Prefer creating new sectors even if there are sectors Available for upgrading. +This setting combined with MaxUpgradingSectors set to a value higher than MaxSealingSectorsForDeals makes it +possible to use fast sector upgrades to handle high volumes of storage deals, while still using the simple sealing +flow when the volume of storage deals is lower.`, + }, + { + Name: "MaxUpgradingSectors", + Type: "uint64", + + Comment: `Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals)`, }, { Name: "CommittedCapacitySectorLifetime", diff --git a/node/config/types.go b/node/config/types.go index 2e9357993..b3ba36c7f 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -228,12 +228,21 @@ type SealingConfig struct { // 0 = no limit MaxWaitDealsSectors uint64 - // Upper bound on how many sectors can be sealing at the same time when creating new CC sectors (0 = unlimited) + // Upper bound on how many sectors can be sealing+upgrading at the same time when creating new CC sectors (0 = unlimited) MaxSealingSectors uint64 - // Upper bound on how many sectors can be sealing at the same time when creating new sectors with deals (0 = unlimited) + // Upper bound on how many sectors can be sealing+upgrading at the same time when creating new sectors with deals (0 = unlimited) MaxSealingSectorsForDeals uint64 + // Prefer creating new sectors even if there are sectors Available for upgrading. + // This setting combined with MaxUpgradingSectors set to a value higher than MaxSealingSectorsForDeals makes it + // possible to use fast sector upgrades to handle high volumes of storage deals, while still using the simple sealing + // flow when the volume of storage deals is lower. + PreferNewSectorsForDeals bool + + // Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals) + MaxUpgradingSectors uint64 + // CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will // live before it must be extended or converted into sector containing deals before it is // terminated. Value must be between 180-540 days inclusive diff --git a/node/impl/net/net.go b/node/impl/net/net.go index 27e7734a1..fe0cf8c58 100644 --- a/node/impl/net/net.go +++ b/node/impl/net/net.go @@ -4,8 +4,10 @@ import ( "context" "sort" "strings" + "time" "go.uber.org/fx" + "golang.org/x/xerrors" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/metrics" @@ -15,6 +17,7 @@ import ( swarm "github.com/libp2p/go-libp2p-swarm" basichost "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/libp2p/go-libp2p/p2p/net/conngater" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" ma "github.com/multiformats/go-multiaddr" "github.com/filecoin-project/lotus/api" @@ -177,6 +180,14 @@ func (a *NetAPI) NetBandwidthStatsByPeer(ctx context.Context) (map[string]metric return out, nil } +func (a *NetAPI) NetPing(ctx context.Context, p peer.ID) (time.Duration, error) { + result, ok := <-ping.Ping(ctx, a.Host, p) + if !ok { + return 0, xerrors.Errorf("didn't get ping result: %w", ctx.Err()) + } + return result.RTT, result.Error +} + func (a *NetAPI) NetBandwidthStatsByProtocol(ctx context.Context) (map[protocol.ID]metrics.Stats, error) { return a.Reporter.GetBandwidthByProtocol(), nil } diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index caff47676..2d7a5c181 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -916,6 +916,8 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error MaxWaitDealsSectors: cfg.MaxWaitDealsSectors, MaxSealingSectors: cfg.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, + PreferNewSectorsForDeals: cfg.PreferNewSectorsForDeals, + MaxUpgradingSectors: cfg.MaxUpgradingSectors, CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime), WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), MakeCCSectorsAvailable: cfg.MakeCCSectorsAvailable, @@ -954,6 +956,8 @@ func ToSealingConfig(dealmakingCfg config.DealmakingConfig, sealingCfg config.Se MaxWaitDealsSectors: sealingCfg.MaxWaitDealsSectors, MaxSealingSectors: sealingCfg.MaxSealingSectors, MaxSealingSectorsForDeals: sealingCfg.MaxSealingSectorsForDeals, + PreferNewSectorsForDeals: sealingCfg.PreferNewSectorsForDeals, + MaxUpgradingSectors: sealingCfg.MaxUpgradingSectors, StartEpochSealingBuffer: abi.ChainEpoch(dealmakingCfg.StartEpochSealingBuffer), MakeNewSectorForDeals: dealmakingCfg.MakeNewSectorForDeals, CommittedCapacitySectorLifetime: time.Duration(sealingCfg.CommittedCapacitySectorLifetime),