Merge remote-tracking branch 'origin/master' into feat/post-worker

This commit is contained in:
Łukasz Magiera 2022-03-25 16:38:30 -04:00
commit 82343a06e7
40 changed files with 885 additions and 235 deletions

View File

@ -917,6 +917,11 @@ workflows:
suite: itest-sector_finalize_early
target: "./itests/sector_finalize_early_test.go"
- test:
name: test-itest-sector_make_cc_avail
suite: itest-sector_make_cc_avail
target: "./itests/sector_make_cc_avail_test.go"
- test:
name: test-itest-sector_miner_collateral
suite: itest-sector_miner_collateral
@ -927,6 +932,16 @@ workflows:
suite: itest-sector_pledge
target: "./itests/sector_pledge_test.go"
- test:
name: test-itest-sector_prefer_no_upgrade
suite: itest-sector_prefer_no_upgrade
target: "./itests/sector_prefer_no_upgrade_test.go"
- test:
name: test-itest-sector_revert_available
suite: itest-sector_revert_available
target: "./itests/sector_revert_available_test.go"
- test:
name: test-itest-sector_terminate
suite: itest-sector_terminate

View File

@ -2,6 +2,7 @@ package api
import (
"context"
"time"
metrics "github.com/libp2p/go-libp2p-core/metrics"
"github.com/libp2p/go-libp2p-core/network"
@ -25,6 +26,7 @@ type Net interface {
NetConnectedness(context.Context, peer.ID) (network.Connectedness, error) //perm:read
NetPeers(context.Context) ([]peer.AddrInfo, error) //perm:read
NetPing(context.Context, peer.ID) (time.Duration, error) //perm:read
NetConnect(context.Context, peer.AddrInfo) error //perm:write
NetAddrsListen(context.Context) (peer.AddrInfo, error) //perm:read
NetDisconnect(context.Context, peer.ID) error //perm:write

View File

@ -8,6 +8,7 @@ import (
context "context"
json "encoding/json"
reflect "reflect"
time "time"
address "github.com/filecoin-project/go-address"
bitfield "github.com/filecoin-project/go-bitfield"
@ -1856,6 +1857,21 @@ func (mr *MockFullNodeMockRecorder) NetPeers(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NetPeers", reflect.TypeOf((*MockFullNode)(nil).NetPeers), arg0)
}
// NetPing mocks base method.
func (m *MockFullNode) NetPing(arg0 context.Context, arg1 peer.ID) (time.Duration, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "NetPing", arg0, arg1)
ret0, _ := ret[0].(time.Duration)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// NetPing indicates an expected call of NetPing.
func (mr *MockFullNodeMockRecorder) NetPing(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NetPing", reflect.TypeOf((*MockFullNode)(nil).NetPing), arg0, arg1)
}
// NetProtectAdd mocks base method.
func (m *MockFullNode) NetProtectAdd(arg0 context.Context, arg1 []peer.ID) error {
m.ctrl.T.Helper()

View File

@ -597,6 +597,8 @@ type NetStruct struct {
NetPeers func(p0 context.Context) ([]peer.AddrInfo, error) `perm:"read"`
NetPing func(p0 context.Context, p1 peer.ID) (time.Duration, error) `perm:"read"`
NetProtectAdd func(p0 context.Context, p1 []peer.ID) error `perm:"admin"`
NetProtectList func(p0 context.Context) ([]peer.ID, error) `perm:"read"`
@ -3712,6 +3714,17 @@ func (s *NetStub) NetPeers(p0 context.Context) ([]peer.AddrInfo, error) {
return *new([]peer.AddrInfo), ErrNotSupported
}
func (s *NetStruct) NetPing(p0 context.Context, p1 peer.ID) (time.Duration, error) {
if s.Internal.NetPing == nil {
return *new(time.Duration), ErrNotSupported
}
return s.Internal.NetPing(p0, p1)
}
func (s *NetStub) NetPing(p0 context.Context, p1 peer.ID) (time.Duration, error) {
return *new(time.Duration), ErrNotSupported
}
func (s *NetStruct) NetProtectAdd(p0 context.Context, p1 []peer.ID) error {
if s.Internal.NetProtectAdd == nil {
return ErrNotSupported

View File

@ -0,0 +1,64 @@
package main
import (
"encoding/gob"
"flag"
"fmt"
"os"
"path"
"reflect"
"github.com/golang/mock/mockgen/model"
pkg_ "github.com/filecoin-project/lotus/api/v0api"
)
var output = flag.String("output", "", "The output file name, or empty to use stdout.")
func main() {
flag.Parse()
its := []struct {
sym string
typ reflect.Type
}{
{"FullNode", reflect.TypeOf((*pkg_.FullNode)(nil)).Elem()},
}
pkg := &model.Package{
// NOTE: This behaves contrary to documented behaviour if the
// package name is not the final component of the import path.
// The reflect package doesn't expose the package name, though.
Name: path.Base("github.com/filecoin-project/lotus/api/v0api"),
}
for _, it := range its {
intf, err := model.InterfaceFromInterfaceType(it.typ)
if err != nil {
fmt.Fprintf(os.Stderr, "Reflection: %v\n", err)
os.Exit(1)
}
intf.Name = it.sym
pkg.Interfaces = append(pkg.Interfaces, intf)
}
outfile := os.Stdout
if len(*output) != 0 {
var err error
outfile, err = os.Create(*output)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to open output file %q", *output)
}
defer func() {
if err := outfile.Close(); err != nil {
fmt.Fprintf(os.Stderr, "failed to close output file %q", *output)
os.Exit(1)
}
}()
}
if err := gob.NewEncoder(outfile).Encode(pkg); err != nil {
fmt.Fprintf(os.Stderr, "gob encode: %v\n", err)
os.Exit(1)
}
}

View File

@ -7,6 +7,7 @@ package v0mocks
import (
context "context"
reflect "reflect"
time "time"
address "github.com/filecoin-project/go-address"
bitfield "github.com/filecoin-project/go-bitfield"
@ -1769,6 +1770,21 @@ func (mr *MockFullNodeMockRecorder) NetPeers(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NetPeers", reflect.TypeOf((*MockFullNode)(nil).NetPeers), arg0)
}
// NetPing mocks base method.
func (m *MockFullNode) NetPing(arg0 context.Context, arg1 peer.ID) (time.Duration, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "NetPing", arg0, arg1)
ret0, _ := ret[0].(time.Duration)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// NetPing indicates an expected call of NetPing.
func (mr *MockFullNodeMockRecorder) NetPing(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NetPing", reflect.TypeOf((*MockFullNode)(nil).NetPing), arg0, arg1)
}
// NetProtectAdd mocks base method.
func (m *MockFullNode) NetProtectAdd(arg0 context.Context, arg1 []peer.ID) error {
m.ctrl.T.Helper()

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -5,12 +5,12 @@ import (
"errors"
"fmt"
"github.com/filecoin-project/lotus/blockstore"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/rand"
"github.com/filecoin-project/go-address"
@ -223,7 +223,7 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
return nil, err
}
buffStore := blockstore.NewBuffered(sm.cs.StateBlockstore())
buffStore := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync())
vmopt := &vm.VMOpts{
StateBase: stateCid,
Epoch: vmHeight,

View File

@ -217,6 +217,7 @@ type FVM struct {
}
func NewFVM(ctx context.Context, opts *VMOpts) (*FVM, error) {
log.Info("using the FVM, this is experimental!")
circToReport := opts.FilVested
// For v14 (and earlier), we perform the FilVested portion of the calculation, and let the FVM dynamically do the rest
// v15 and after, the circ supply is always constant per epoch, so we calculate the base and report it at creation

View File

@ -161,6 +161,7 @@ var msigCreateCmd = &cli.Command{
msgCid := sm.Cid()
fmt.Println("sent create in message: ", msgCid)
fmt.Println("waiting for confirmation..")
// wait for it to get mined into a block
wait, err := api.StateWaitMsg(ctx, msgCid, uint64(cctx.Int("confidence")), build.Finality, true)

View File

@ -1,12 +1,14 @@
package cli
import (
"context"
"encoding/json"
"fmt"
"os"
"sort"
"strings"
"text/tabwriter"
"time"
"github.com/dustin/go-humanize"
"github.com/urfave/cli/v2"
@ -28,6 +30,7 @@ var NetCmd = &cli.Command{
Usage: "Manage P2P Network",
Subcommands: []*cli.Command{
NetPeers,
NetPing,
NetConnect,
NetListen,
NetId,
@ -117,6 +120,82 @@ var NetPeers = &cli.Command{
},
}
var NetPing = &cli.Command{
Name: "ping",
Usage: "Ping peers",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "count",
Value: 10,
Aliases: []string{"c"},
Usage: "specify the number of times it should ping",
},
&cli.DurationFlag{
Name: "interval",
Value: time.Second,
Aliases: []string{"i"},
Usage: "minimum time between pings",
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return xerrors.Errorf("please provide a peerID")
}
api, closer, err := GetAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
pis, err := addrInfoFromArg(ctx, cctx)
if err != nil {
return err
}
count := cctx.Int("count")
interval := cctx.Duration("interval")
for _, pi := range pis {
err := api.NetConnect(ctx, pi)
if err != nil {
return xerrors.Errorf("connect: %w", err)
}
fmt.Printf("PING %s\n", pi.ID)
var avg time.Duration
var successful int
for i := 0; i < count && ctx.Err() == nil; i++ {
start := time.Now()
rtt, err := api.NetPing(ctx, pi.ID)
if err != nil {
if ctx.Err() != nil {
break
}
log.Errorf("Ping failed: error=%v", err)
continue
}
fmt.Printf("Pong received: time=%v\n", rtt)
avg = avg + rtt
successful++
wctx, cancel := context.WithTimeout(ctx, time.Until(start.Add(interval)))
<-wctx.Done()
cancel()
}
if successful > 0 {
fmt.Printf("Average latency: %v\n", avg/time.Duration(successful))
}
}
return nil
},
}
var NetScores = &cli.Command{
Name: "scores",
Usage: "Print peers' pubsub scores",
@ -192,45 +271,9 @@ var NetConnect = &cli.Command{
defer closer()
ctx := ReqContext(cctx)
pis, err := addrutil.ParseAddresses(ctx, cctx.Args().Slice())
pis, err := addrInfoFromArg(ctx, cctx)
if err != nil {
a, perr := address.NewFromString(cctx.Args().First())
if perr != nil {
return err
}
na, fc, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer fc()
mi, err := na.StateMinerInfo(ctx, a, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("getting miner info: %w", err)
}
if mi.PeerId == nil {
return xerrors.Errorf("no PeerID for miner")
}
multiaddrs := make([]multiaddr.Multiaddr, 0, len(mi.Multiaddrs))
for i, a := range mi.Multiaddrs {
maddr, err := multiaddr.NewMultiaddrBytes(a)
if err != nil {
log.Warnf("parsing multiaddr %d (%x): %s", i, a, err)
continue
}
multiaddrs = append(multiaddrs, maddr)
}
pi := peer.AddrInfo{
ID: *mi.PeerId,
Addrs: multiaddrs,
}
fmt.Printf("%s -> %s\n", a, pi)
pis = append(pis, pi)
return err
}
for _, pi := range pis {
@ -247,6 +290,51 @@ var NetConnect = &cli.Command{
},
}
func addrInfoFromArg(ctx context.Context, cctx *cli.Context) ([]peer.AddrInfo, error) {
pis, err := addrutil.ParseAddresses(ctx, cctx.Args().Slice())
if err != nil {
a, perr := address.NewFromString(cctx.Args().First())
if perr != nil {
return nil, err
}
na, fc, err := GetFullNodeAPI(cctx)
if err != nil {
return nil, err
}
defer fc()
mi, err := na.StateMinerInfo(ctx, a, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting miner info: %w", err)
}
if mi.PeerId == nil {
return nil, xerrors.Errorf("no PeerID for miner")
}
multiaddrs := make([]multiaddr.Multiaddr, 0, len(mi.Multiaddrs))
for i, a := range mi.Multiaddrs {
maddr, err := multiaddr.NewMultiaddrBytes(a)
if err != nil {
log.Warnf("parsing multiaddr %d (%x): %s", i, a, err)
continue
}
multiaddrs = append(multiaddrs, maddr)
}
pi := peer.AddrInfo{
ID: *mi.PeerId,
Addrs: multiaddrs,
}
fmt.Printf("%s -> %s\n", a, pi)
pis = append(pis, pi)
}
return pis, err
}
var NetId = &cli.Command{
Name: "id",
Usage: "Get node identity",

View File

@ -212,6 +212,7 @@ var setAskCmd = &cli.Command{
Name: "max-piece-size",
Usage: "Set maximum piece size (w/bit-padding, in bytes) in ask to `SIZE`",
DefaultText: "miner sector size",
Value: "0",
},
},
Action: func(cctx *cli.Context) error {

View File

@ -365,6 +365,10 @@ var provingCheckProvableCmd = &cli.Command{
Name: "storage-id",
Usage: "filter sectors by storage path (path id)",
},
&cli.BoolFlag{
Name: "faulty",
Usage: "only check faulty sectors",
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
@ -376,7 +380,7 @@ var provingCheckProvableCmd = &cli.Command{
return xerrors.Errorf("could not parse deadline index: %w", err)
}
api, closer, err := lcli.GetFullNodeAPI(cctx)
api, closer, err := lcli.GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
@ -428,6 +432,38 @@ var provingCheckProvableCmd = &cli.Command{
}
}
if cctx.Bool("faulty") {
parts, err := getAllPartitions(ctx, addr, api)
if err != nil {
return xerrors.Errorf("getting partitions: %w", err)
}
if filter != nil {
for k := range filter {
set, err := parts.FaultySectors.IsSet(uint64(k.Number))
if err != nil {
return err
}
if !set {
delete(filter, k)
}
}
} else {
filter = map[abi.SectorID]struct{}{}
err = parts.FaultySectors.ForEach(func(s uint64) error {
filter[abi.SectorID{
Miner: abi.ActorID(mid),
Number: abi.SectorNumber(s),
}] = struct{}{}
return nil
})
if err != nil {
return err
}
}
}
for parIdx, par := range partitions {
sectors := make(map[abi.SectorNumber]struct{})

View File

@ -22,6 +22,7 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
@ -557,7 +558,7 @@ var storageListSectorsCmd = &cli.Command{
}
defer closer()
napi, closer2, err := lcli.GetFullNodeAPI(cctx)
napi, closer2, err := lcli.GetFullNodeAPIV1(cctx)
if err != nil {
return err
}
@ -592,6 +593,11 @@ var storageListSectorsCmd = &cli.Command{
}
}
allParts, err := getAllPartitions(ctx, maddr, napi)
if err != nil {
return xerrors.Errorf("getting partition states: %w", err)
}
type entry struct {
id abi.SectorNumber
storage storiface.ID
@ -601,6 +607,8 @@ var storageListSectorsCmd = &cli.Command{
primary, copy, main, seal, store bool
state api.SectorState
faulty bool
}
var list []entry
@ -610,6 +618,10 @@ var storageListSectorsCmd = &cli.Command{
if err != nil {
return xerrors.Errorf("getting sector status for sector %d: %w", sector, err)
}
fault, err := allParts.FaultySectors.IsSet(uint64(sector))
if err != nil {
return xerrors.Errorf("checking if sector is faulty: %w", err)
}
for _, ft := range storiface.PathTypes {
si, err := nodeApi.StorageFindSector(ctx, sid(sector), ft, mi.SectorSize, false)
@ -632,7 +644,8 @@ var storageListSectorsCmd = &cli.Command{
seal: info.CanSeal,
store: info.CanStore,
state: st.State,
state: st.State,
faulty: fault,
})
}
}
@ -660,6 +673,7 @@ var storageListSectorsCmd = &cli.Command{
tablewriter.Col("Sector"),
tablewriter.Col("Type"),
tablewriter.Col("State"),
tablewriter.Col("Faulty"),
tablewriter.Col("Primary"),
tablewriter.Col("Path use"),
tablewriter.Col("URLs"),
@ -687,6 +701,10 @@ var storageListSectorsCmd = &cli.Command{
"Path use": maybeStr(e.seal, color.FgMagenta, "seal ") + maybeStr(e.store, color.FgCyan, "store"),
"URLs": e.urls,
}
if e.faulty {
// only set when there is a fault, so the column is hidden with no faults
m["Faulty"] = color.RedString("faulty")
}
tw.Write(m)
}
@ -694,6 +712,52 @@ var storageListSectorsCmd = &cli.Command{
},
}
func getAllPartitions(ctx context.Context, maddr address.Address, napi api.FullNode) (api.Partition, error) {
deadlines, err := napi.StateMinerDeadlines(ctx, maddr, types.EmptyTSK)
if err != nil {
return api.Partition{}, xerrors.Errorf("getting deadlines: %w", err)
}
out := api.Partition{
AllSectors: bitfield.New(),
FaultySectors: bitfield.New(),
RecoveringSectors: bitfield.New(),
LiveSectors: bitfield.New(),
ActiveSectors: bitfield.New(),
}
for dlIdx := range deadlines {
partitions, err := napi.StateMinerPartitions(ctx, maddr, uint64(dlIdx), types.EmptyTSK)
if err != nil {
return api.Partition{}, xerrors.Errorf("getting partitions for deadline %d: %w", dlIdx, err)
}
for _, partition := range partitions {
out.AllSectors, err = bitfield.MergeBitFields(out.AllSectors, partition.AllSectors)
if err != nil {
return api.Partition{}, err
}
out.FaultySectors, err = bitfield.MergeBitFields(out.FaultySectors, partition.FaultySectors)
if err != nil {
return api.Partition{}, err
}
out.RecoveringSectors, err = bitfield.MergeBitFields(out.RecoveringSectors, partition.RecoveringSectors)
if err != nil {
return api.Partition{}, err
}
out.LiveSectors, err = bitfield.MergeBitFields(out.LiveSectors, partition.LiveSectors)
if err != nil {
return api.Partition{}, err
}
out.ActiveSectors, err = bitfield.MergeBitFields(out.ActiveSectors, partition.ActiveSectors)
if err != nil {
return api.Partition{}, err
}
}
}
return out, nil
}
func maybeStr(c bool, col color.Attribute, s string) string {
if !c {
return ""

View File

@ -278,7 +278,7 @@ func openRepo(cctx *cli.Context) (repo.LockedRepo, types.KeyStore, error) {
return nil, nil, err
}
if !ok {
if err := r.Init(repo.Worker); err != nil {
if err := r.Init(repo.Wallet); err != nil {
return nil, nil, err
}
}

View File

@ -88,6 +88,7 @@
* [NetLimit](#NetLimit)
* [NetPeerInfo](#NetPeerInfo)
* [NetPeers](#NetPeers)
* [NetPing](#NetPing)
* [NetProtectAdd](#NetProtectAdd)
* [NetProtectList](#NetProtectList)
* [NetProtectRemove](#NetProtectRemove)
@ -1848,6 +1849,20 @@ Response:
]
```
### NetPing
Perms: read
Inputs:
```json
[
"12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf"
]
```
Response: `60000000000`
### NetProtectAdd

View File

@ -131,6 +131,7 @@
* [NetLimit](#NetLimit)
* [NetPeerInfo](#NetPeerInfo)
* [NetPeers](#NetPeers)
* [NetPing](#NetPing)
* [NetProtectAdd](#NetProtectAdd)
* [NetProtectList](#NetProtectList)
* [NetProtectRemove](#NetProtectRemove)
@ -3908,6 +3909,20 @@ Response:
]
```
### NetPing
Perms: read
Inputs:
```json
[
"12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf"
]
```
Response: `60000000000`
### NetProtectAdd

View File

@ -137,6 +137,7 @@
* [NetLimit](#NetLimit)
* [NetPeerInfo](#NetPeerInfo)
* [NetPeers](#NetPeers)
* [NetPing](#NetPing)
* [NetProtectAdd](#NetProtectAdd)
* [NetProtectList](#NetProtectList)
* [NetProtectRemove](#NetProtectRemove)
@ -4270,6 +4271,20 @@ Response:
]
```
### NetPing
Perms: read
Inputs:
```json
[
"12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf"
]
```
Response: `60000000000`
### NetProtectAdd

View File

@ -1200,6 +1200,7 @@ USAGE:
COMMANDS:
peers Print peers
ping Ping peers
connect Connect to a peer
listen List listen addresses
id Get node identity
@ -1235,6 +1236,21 @@ OPTIONS:
```
### lotus-miner net ping
```
NAME:
lotus-miner net ping - Ping peers
USAGE:
lotus-miner net ping [command options] [arguments...]
OPTIONS:
--count value, -c value specify the number of times it should ping (default: 10)
--interval value, -i value minimum time between pings (default: 1s)
--help, -h show help (default: false)
```
### lotus-miner net connect
```
NAME:
@ -2095,6 +2111,7 @@ OPTIONS:
--only-bad print only bad sectors (default: false)
--slow run slower checks (default: false)
--storage-id value filter sectors by storage path (path id)
--faulty only check faulty sectors (default: false)
--help, -h show help (default: false)
```

View File

@ -2602,6 +2602,7 @@ USAGE:
COMMANDS:
peers Print peers
ping Ping peers
connect Connect to a peer
listen List listen addresses
id Get node identity
@ -2637,6 +2638,21 @@ OPTIONS:
```
### lotus net ping
```
NAME:
lotus net ping - Ping peers
USAGE:
lotus net ping [command options] [arguments...]
OPTIONS:
--count value, -c value specify the number of times it should ping (default: 10)
--interval value, -i value minimum time between pings (default: 1s)
--help, -h show help (default: false)
```
### lotus net connect
```
NAME:

View File

@ -325,18 +325,33 @@
# env var: LOTUS_SEALING_MAXWAITDEALSSECTORS
#MaxWaitDealsSectors = 2
# Upper bound on how many sectors can be sealing at the same time when creating new CC sectors (0 = unlimited)
# Upper bound on how many sectors can be sealing+upgrading at the same time when creating new CC sectors (0 = unlimited)
#
# type: uint64
# env var: LOTUS_SEALING_MAXSEALINGSECTORS
#MaxSealingSectors = 0
# Upper bound on how many sectors can be sealing at the same time when creating new sectors with deals (0 = unlimited)
# Upper bound on how many sectors can be sealing+upgrading at the same time when creating new sectors with deals (0 = unlimited)
#
# type: uint64
# env var: LOTUS_SEALING_MAXSEALINGSECTORSFORDEALS
#MaxSealingSectorsForDeals = 0
# Prefer creating new sectors even if there are sectors Available for upgrading.
# This setting combined with MaxUpgradingSectors set to a value higher than MaxSealingSectorsForDeals makes it
# possible to use fast sector upgrades to handle high volumes of storage deals, while still using the simple sealing
# flow when the volume of storage deals is lower.
#
# type: bool
# env var: LOTUS_SEALING_PREFERNEWSECTORSFORDEALS
#PreferNewSectorsForDeals = false
# Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals)
#
# type: uint64
# env var: LOTUS_SEALING_MAXUPGRADINGSECTORS
#MaxUpgradingSectors = 0
# CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will
# live before it must be extended or converted into sector containing deals before it is
# terminated. Value must be between 180-540 days inclusive

View File

@ -166,8 +166,6 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)
log.Debugf("result of first tryReadUnsealedPiece: r=%s, err=%s", r, err)
if xerrors.Is(err, storiface.ErrSectorNotFound) {
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)
err = nil

View File

@ -473,6 +473,8 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
return matches[i].sector.Number < matches[j].sector.Number // prefer older sectors
})
log.Debugw("updateInput matching", "matches", len(matches), "toAssign", len(toAssign), "openSectors", len(m.openSectors), "pieces", len(m.pendingPieces))
var assigned int
for _, mt := range matches {
if m.pendingPieces[mt.deal].assigned {
@ -506,6 +508,8 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
}
}
log.Debugw("updateInput matching done", "matches", len(matches), "toAssign", len(toAssign), "assigned", assigned, "openSectors", len(m.openSectors), "pieces", len(m.pendingPieces))
if len(toAssign) > 0 {
log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors)
if err := m.tryGetDealSector(ctx, sp, expF); err != nil {
@ -550,7 +554,7 @@ func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize
return curEpoch + minDur, curEpoch + maxDur, nil
}
func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {
func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {
if len(m.available) == 0 {
return false, nil
}
@ -619,56 +623,8 @@ func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSeal
return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{})
}
func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error {
m.startupWait.Wait()
if m.nextDealSector != nil {
return nil // new sector is being created right now
}
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting storage config: %w", err)
}
if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
return nil
}
if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
return nil
}
got, err := m.tryGetUpgradeSector(ctx, sp, ef)
if err != nil {
return err
}
if got {
return nil
}
if !cfg.MakeNewSectorForDeals {
return nil
}
sid, err := m.createSector(ctx, cfg, sp)
if err != nil {
return err
}
m.nextDealSector = &sid
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
return m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: sp,
})
}
// call with m.inputLk
func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) {
// Now actually create a new sector
sid, err := m.sc.Next()
if err != nil {
return 0, xerrors.Errorf("getting sector number: %w", err)
@ -682,7 +638,74 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi
// update stats early, fsm planner would do that async
m.stats.updateSector(ctx, cfg, m.minerSectorID(sid), UndefinedSectorState)
return sid, nil
return sid, err
}
func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error {
m.startupWait.Wait()
if m.nextDealSector != nil {
return nil // new sector is being created right now
}
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting storage config: %w", err)
}
// if we're above WaitDeals limit, we don't want to add more staging sectors
if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
return nil
}
maxUpgrading := cfg.MaxSealingSectorsForDeals
if cfg.MaxUpgradingSectors > 0 {
maxUpgrading = cfg.MaxUpgradingSectors
}
canCreate := cfg.MakeNewSectorForDeals && !(cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals)
canUpgrade := !(maxUpgrading > 0 && m.stats.curSealing() >= maxUpgrading)
// we want to try to upgrade when:
// - we can upgrade and prefer upgrades
// - we don't prefer upgrades, but can't create a new sector
shouldUpgrade := canUpgrade && (!cfg.PreferNewSectorsForDeals || !canCreate)
log.Infow("new deal sector decision",
"sealing", m.stats.curSealing(),
"maxSeal", cfg.MaxSealingSectorsForDeals,
"maxUpgrade", maxUpgrading,
"preferNew", cfg.PreferNewSectorsForDeals,
"canCreate", canCreate,
"canUpgrade", canUpgrade,
"shouldUpgrade", shouldUpgrade)
if shouldUpgrade {
got, err := m.maybeUpgradeSector(ctx, sp, ef)
if err != nil {
return err
}
if got {
return nil
}
}
if canCreate {
sid, err := m.createSector(ctx, cfg, sp)
if err != nil {
return err
}
m.nextDealSector = &sid
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
if err := m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: sp,
}); err != nil {
return err
}
}
return nil
}
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {

View File

@ -18,6 +18,10 @@ type Config struct {
// includes failed, 0 = no limit
MaxSealingSectorsForDeals uint64
PreferNewSectorsForDeals bool
MaxUpgradingSectors uint64
MakeNewSectorForDeals bool
MakeCCSectorsAvailable bool

View File

@ -184,14 +184,14 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect
}
func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sector SectorInfo) error {
if err := failedCooldown(ctx, sector); err != nil {
return err
}
if sector.ReplicaUpdateMessage != nil {
mw, err := m.Api.StateSearchMsg(ctx.Context(), *sector.ReplicaUpdateMessage)
if err != nil {
// API error
if err := failedCooldown(ctx, sector); err != nil {
return err
}
return ctx.Send(SectorRetrySubmitReplicaUpdateWait{})
}
@ -248,10 +248,6 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect
return ctx.Send(SectorAbortUpgrade{})
}
if err := failedCooldown(ctx, sector); err != nil {
return err
}
return ctx.Send(SectorRetrySubmitReplicaUpdate{})
}

View File

@ -16,7 +16,7 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e
}
if si.State != Proving {
return xerrors.Errorf("can't mark sectors not in the 'Proving' state for upgrade")
return xerrors.Errorf("unable to snap-up sectors not in the 'Proving' state")
}
if si.hasDeals() {

View File

@ -7,18 +7,13 @@ import (
"testing"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/itests/kit"
)
@ -34,7 +29,23 @@ func TestCCUpgrade(t *testing.T) {
//stm: @MINER_SECTOR_LIST_001
kit.QuietMiningLogs()
runTestCCUpgrade(t)
n := runTestCCUpgrade(t)
t.Run("post", func(t *testing.T) {
ctx := context.Background()
ts, err := n.ChainHead(ctx)
require.NoError(t, err)
start := ts.Height()
// wait for a full proving period
t.Log("waiting for chain")
n.WaitTillChain(ctx, func(ts *types.TipSet) bool {
if ts.Height() > start+abi.ChainEpoch(2880) {
return true
}
return false
})
})
}
func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
@ -62,7 +73,7 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
require.NoError(t, err)
require.Less(t, 50000, int(si.Expiration))
}
waitForSectorActive(ctx, t, CCUpgrade, client, maddr)
client.WaitForSectorActive(ctx, t, CCUpgrade, maddr)
//stm: @SECTOR_CC_UPGRADE_001
err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
@ -90,105 +101,3 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
return client
}
func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, node *kit.TestFullNode, maddr address.Address) {
for {
active, err := node.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
for _, si := range active {
if si.SectorNumber == sn {
fmt.Printf("ACTIVE\n")
return
}
}
time.Sleep(time.Second)
}
}
func TestCCUpgradeAndPoSt(t *testing.T) {
kit.QuietMiningLogs()
_ = logging.SetLogLevel("storageminer", "INFO")
t.Run("upgrade and then post", func(t *testing.T) {
ctx := context.Background()
n := runTestCCUpgrade(t)
ts, err := n.ChainHead(ctx)
require.NoError(t, err)
start := ts.Height()
// wait for a full proving period
t.Log("waiting for chain")
n.WaitTillChain(ctx, func(ts *types.TipSet) bool {
if ts.Height() > start+abi.ChainEpoch(2880) {
return true
}
return false
})
})
}
func TestAbortUpgradeAvailable(t *testing.T) {
kit.QuietMiningLogs()
ctx := context.Background()
blockTime := 1 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC())
ens.InterconnectAll().BeginMiningMustPost(blockTime)
maddr, err := miner.ActorAddress(ctx)
if err != nil {
t.Fatal(err)
}
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
fmt.Printf("CCUpgrade: %d\n", CCUpgrade)
miner.PledgeSectors(ctx, 1, 0, nil)
sl, err := miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
require.Equal(t, CCUpgrade, sl[0], "unexpected sector number")
{
si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK)
require.NoError(t, err)
require.Less(t, 50000, int(si.Expiration))
}
waitForSectorActive(ctx, t, CCUpgrade, client, maddr)
err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
require.NoError(t, err)
sl, err = miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
ss, err := miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
for i := 0; i < 100; i++ {
ss, err = miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
if ss.State == api.SectorState(sealing.Proving) {
time.Sleep(50 * time.Millisecond)
continue
}
require.Equal(t, api.SectorState(sealing.Available), ss.State)
break
}
require.NoError(t, miner.SectorAbortUpgrade(ctx, sl[0]))
for i := 0; i < 100; i++ {
ss, err = miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
if ss.State == api.SectorState(sealing.Available) {
time.Sleep(50 * time.Millisecond)
continue
}
require.Equal(t, api.SectorState(sealing.Proving), ss.State)
break
}
}

View File

@ -2,7 +2,9 @@ package kit
import (
"context"
"fmt"
"testing"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
@ -67,6 +69,21 @@ func (f *TestFullNode) WaitTillChain(ctx context.Context, pred ChainPredicate) *
return nil
}
func (f *TestFullNode) WaitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, maddr address.Address) {
for {
active, err := f.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)
for _, si := range active {
if si.SectorNumber == sn {
fmt.Printf("ACTIVE\n")
return
}
}
time.Sleep(time.Second)
}
}
// ChainPredicate encapsulates a chain condition.
type ChainPredicate func(set *types.TipSet) bool

View File

@ -102,7 +102,7 @@ func (tm *TestMiner) WaitSectorsProving(ctx context.Context, toCheck map[abi.Sec
st, err := tm.StorageMiner.SectorsStatus(ctx, n, false)
require.NoError(tm.t, err)
states[st.State]++
if st.State == api.SectorState(sealing.Proving) {
if st.State == api.SectorState(sealing.Proving) || st.State == api.SectorState(sealing.Available) {
delete(toCheck, n)
}
if strings.Contains(string(st.State), "Fail") {

View File

@ -5,6 +5,11 @@ import (
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
@ -160,6 +165,17 @@ func ConstructorOpts(extra ...node.Option) NodeOpt {
}
}
func MutateSealingConfig(mut func(sc *config.SealingConfig)) NodeOpt {
return ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
cf := config.DefaultStorageMiner()
mut(&cf.Sealing)
return modules.ToSealingConfig(cf.Dealmaking, cf.Sealing), nil
}, nil
})))
}
// SectorSize sets the sector size for this miner. Start() will populate the
// corresponding proof type depending on the network version (genesis network
// version if the Ensemble is unstarted, or the current network version

View File

@ -9,13 +9,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
func TestDealsWithFinalizeEarly(t *testing.T) {
@ -34,14 +29,7 @@ func TestDealsWithFinalizeEarly(t *testing.T) {
var blockTime = 50 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
cf := config.DefaultStorageMiner()
cf.Sealing.FinalizeEarly = true
return modules.ToSealingConfig(cf.Dealmaking, cf.Sealing), nil
}, nil
})))) // no mock proofs.
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MutateSealingConfig(func(sc *config.SealingConfig) { sc.FinalizeEarly = true })) // no mock proofs.
ens.InterconnectAll().BeginMining(blockTime)
dh := kit.NewDealHarness(t, client, miner, miner)

View File

@ -0,0 +1,77 @@
package itests
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node/config"
)
func TestMakeAvailable(t *testing.T) {
kit.QuietMiningLogs()
ctx := context.Background()
blockTime := 1 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC(), kit.MutateSealingConfig(func(sc *config.SealingConfig) {
sc.MakeCCSectorsAvailable = true
}))
ens.InterconnectAll().BeginMiningMustPost(blockTime)
maddr, err := miner.ActorAddress(ctx)
if err != nil {
t.Fatal(err)
}
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
miner.PledgeSectors(ctx, 1, 0, nil)
sl, err := miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
require.Equal(t, CCUpgrade, sl[0], "unexpected sector number")
{
si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK)
require.NoError(t, err)
require.Less(t, 50000, int(si.Expiration))
}
client.WaitForSectorActive(ctx, t, CCUpgrade, maddr)
sl, err = miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
status, err := miner.SectorsStatus(ctx, CCUpgrade, true)
require.NoError(t, err)
assert.Equal(t, api.SectorState(sealing.Available), status.State)
dh := kit.NewDealHarness(t, client, miner, miner)
deal, res, inPath := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{
Rseed: 6,
SuspendUntilCryptoeconStable: true,
})
outPath := dh.PerformRetrieval(context.Background(), deal, res.Root, false)
kit.AssertFilesEqual(t, inPath, outPath)
sl, err = miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
status, err = miner.SectorsStatus(ctx, CCUpgrade, true)
require.NoError(t, err)
assert.Equal(t, 1, len(status.Deals))
miner.WaitSectorsProving(ctx, map[abi.SectorNumber]struct{}{
CCUpgrade: {},
})
}

View File

@ -0,0 +1,89 @@
package itests
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node/config"
)
func TestPreferNoUpgrade(t *testing.T) {
kit.QuietMiningLogs()
ctx := context.Background()
blockTime := 1 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC(), kit.MutateSealingConfig(func(sc *config.SealingConfig) {
sc.PreferNewSectorsForDeals = true
}))
ens.InterconnectAll().BeginMiningMustPost(blockTime)
maddr, err := miner.ActorAddress(ctx)
if err != nil {
t.Fatal(err)
}
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
Sealed := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 2)
{
miner.PledgeSectors(ctx, 1, 0, nil)
sl, err := miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
require.Equal(t, CCUpgrade, sl[0], "unexpected sector number")
{
si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK)
require.NoError(t, err)
require.Less(t, 50000, int(si.Expiration))
}
client.WaitForSectorActive(ctx, t, CCUpgrade, maddr)
err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
require.NoError(t, err)
sl, err = miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
}
{
dh := kit.NewDealHarness(t, client, miner, miner)
deal, res, inPath := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{
Rseed: 6,
SuspendUntilCryptoeconStable: true,
})
outPath := dh.PerformRetrieval(context.Background(), deal, res.Root, false)
kit.AssertFilesEqual(t, inPath, outPath)
}
sl, err := miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 2, "expected 2 sectors")
{
status, err := miner.SectorsStatus(ctx, CCUpgrade, true)
require.NoError(t, err)
assert.Equal(t, api.SectorState(sealing.Available), status.State)
}
{
status, err := miner.SectorsStatus(ctx, Sealed, true)
require.NoError(t, err)
assert.Equal(t, 1, len(status.Deals))
miner.WaitSectorsProving(ctx, map[abi.SectorNumber]struct{}{
Sealed: {},
})
}
}

View File

@ -0,0 +1,84 @@
package itests
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/itests/kit"
)
func TestAbortUpgradeAvailable(t *testing.T) {
kit.QuietMiningLogs()
ctx := context.Background()
blockTime := 1 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC())
ens.InterconnectAll().BeginMiningMustPost(blockTime)
maddr, err := miner.ActorAddress(ctx)
if err != nil {
t.Fatal(err)
}
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
fmt.Printf("CCUpgrade: %d\n", CCUpgrade)
miner.PledgeSectors(ctx, 1, 0, nil)
sl, err := miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
require.Equal(t, CCUpgrade, sl[0], "unexpected sector number")
{
si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK)
require.NoError(t, err)
require.Less(t, 50000, int(si.Expiration))
}
client.WaitForSectorActive(ctx, t, CCUpgrade, maddr)
err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
require.NoError(t, err)
sl, err = miner.SectorsList(ctx)
require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
ss, err := miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
for i := 0; i < 100; i++ {
ss, err = miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
if ss.State == api.SectorState(sealing.Proving) {
time.Sleep(50 * time.Millisecond)
continue
}
require.Equal(t, api.SectorState(sealing.Available), ss.State)
break
}
require.NoError(t, miner.SectorAbortUpgrade(ctx, sl[0]))
for i := 0; i < 100; i++ {
ss, err = miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
if ss.State == api.SectorState(sealing.Available) {
time.Sleep(50 * time.Millisecond)
continue
}
require.Equal(t, api.SectorState(sealing.Proving), ss.State)
break
}
}

View File

@ -714,13 +714,28 @@ Note that setting this number too high in relation to deal ingestion rate may re
Name: "MaxSealingSectors",
Type: "uint64",
Comment: `Upper bound on how many sectors can be sealing at the same time when creating new CC sectors (0 = unlimited)`,
Comment: `Upper bound on how many sectors can be sealing+upgrading at the same time when creating new CC sectors (0 = unlimited)`,
},
{
Name: "MaxSealingSectorsForDeals",
Type: "uint64",
Comment: `Upper bound on how many sectors can be sealing at the same time when creating new sectors with deals (0 = unlimited)`,
Comment: `Upper bound on how many sectors can be sealing+upgrading at the same time when creating new sectors with deals (0 = unlimited)`,
},
{
Name: "PreferNewSectorsForDeals",
Type: "bool",
Comment: `Prefer creating new sectors even if there are sectors Available for upgrading.
This setting combined with MaxUpgradingSectors set to a value higher than MaxSealingSectorsForDeals makes it
possible to use fast sector upgrades to handle high volumes of storage deals, while still using the simple sealing
flow when the volume of storage deals is lower.`,
},
{
Name: "MaxUpgradingSectors",
Type: "uint64",
Comment: `Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals)`,
},
{
Name: "CommittedCapacitySectorLifetime",

View File

@ -228,12 +228,21 @@ type SealingConfig struct {
// 0 = no limit
MaxWaitDealsSectors uint64
// Upper bound on how many sectors can be sealing at the same time when creating new CC sectors (0 = unlimited)
// Upper bound on how many sectors can be sealing+upgrading at the same time when creating new CC sectors (0 = unlimited)
MaxSealingSectors uint64
// Upper bound on how many sectors can be sealing at the same time when creating new sectors with deals (0 = unlimited)
// Upper bound on how many sectors can be sealing+upgrading at the same time when creating new sectors with deals (0 = unlimited)
MaxSealingSectorsForDeals uint64
// Prefer creating new sectors even if there are sectors Available for upgrading.
// This setting combined with MaxUpgradingSectors set to a value higher than MaxSealingSectorsForDeals makes it
// possible to use fast sector upgrades to handle high volumes of storage deals, while still using the simple sealing
// flow when the volume of storage deals is lower.
PreferNewSectorsForDeals bool
// Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals)
MaxUpgradingSectors uint64
// CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will
// live before it must be extended or converted into sector containing deals before it is
// terminated. Value must be between 180-540 days inclusive

View File

@ -4,8 +4,10 @@ import (
"context"
"sort"
"strings"
"time"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/metrics"
@ -15,6 +17,7 @@ import (
swarm "github.com/libp2p/go-libp2p-swarm"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
ma "github.com/multiformats/go-multiaddr"
"github.com/filecoin-project/lotus/api"
@ -177,6 +180,14 @@ func (a *NetAPI) NetBandwidthStatsByPeer(ctx context.Context) (map[string]metric
return out, nil
}
func (a *NetAPI) NetPing(ctx context.Context, p peer.ID) (time.Duration, error) {
result, ok := <-ping.Ping(ctx, a.Host, p)
if !ok {
return 0, xerrors.Errorf("didn't get ping result: %w", ctx.Err())
}
return result.RTT, result.Error
}
func (a *NetAPI) NetBandwidthStatsByProtocol(ctx context.Context) (map[protocol.ID]metrics.Stats, error) {
return a.Reporter.GetBandwidthByProtocol(), nil
}

View File

@ -916,6 +916,8 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
MaxWaitDealsSectors: cfg.MaxWaitDealsSectors,
MaxSealingSectors: cfg.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
PreferNewSectorsForDeals: cfg.PreferNewSectorsForDeals,
MaxUpgradingSectors: cfg.MaxUpgradingSectors,
CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime),
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
MakeCCSectorsAvailable: cfg.MakeCCSectorsAvailable,
@ -954,6 +956,8 @@ func ToSealingConfig(dealmakingCfg config.DealmakingConfig, sealingCfg config.Se
MaxWaitDealsSectors: sealingCfg.MaxWaitDealsSectors,
MaxSealingSectors: sealingCfg.MaxSealingSectors,
MaxSealingSectorsForDeals: sealingCfg.MaxSealingSectorsForDeals,
PreferNewSectorsForDeals: sealingCfg.PreferNewSectorsForDeals,
MaxUpgradingSectors: sealingCfg.MaxUpgradingSectors,
StartEpochSealingBuffer: abi.ChainEpoch(dealmakingCfg.StartEpochSealingBuffer),
MakeNewSectorForDeals: dealmakingCfg.MakeNewSectorForDeals,
CommittedCapacitySectorLifetime: time.Duration(sealingCfg.CommittedCapacitySectorLifetime),