Merge pull request #2285 from filecoin-project/asr/timer

Update storage-FSM, add API to set sector seal delay
This commit is contained in:
Łukasz Magiera 2020-07-07 21:43:14 +02:00 committed by GitHub
commit c97af35176
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 111 additions and 29 deletions

View File

@ -3,15 +3,14 @@ package api
import (
"bytes"
"context"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/sector-storage/storiface"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
"time"
)
// StorageMiner is a low-level interface to the Filecoin network storage miner node
@ -35,9 +34,15 @@ type StorageMiner interface {
SectorsRefs(context.Context) (map[string][]SealedRef, error)
// SectorStartSealing can be called on sectors in Empty on WaitDeals states
// SectorStartSealing can be called on sectors in Empty or WaitDeals states
// to trigger sealing early
SectorStartSealing(context.Context, abi.SectorNumber) error
// SectorSetSealDelay sets the time that a newly-created sector
// waits for more deals before it starts sealing
SectorSetSealDelay(context.Context, time.Duration) error
// SectorGetSealDelay gets the time that a newly-created sector
// waits for more deals before it starts sealing
SectorGetSealDelay(context.Context) (time.Duration, error)
SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error
SectorRemove(context.Context, abi.SectorNumber) error
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error

View File

@ -2,15 +2,14 @@ package apistruct
import (
"context"
"io"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"io"
"time"
"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/sector-storage/stores"
@ -206,6 +205,8 @@ type StorageMinerStruct struct {
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"`
SectorStartSealing func(context.Context, abi.SectorNumber) error `perm:"write"`
SectorSetSealDelay func(context.Context, time.Duration) error `perm:"write"`
SectorGetSealDelay func(context.Context) (time.Duration, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"`
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`
@ -798,6 +799,14 @@ func (c *StorageMinerStruct) SectorStartSealing(ctx context.Context, number abi.
return c.Internal.SectorStartSealing(ctx, number)
}
func (c *StorageMinerStruct) SectorSetSealDelay(ctx context.Context, delay time.Duration) error {
return c.Internal.SectorSetSealDelay(ctx, delay)
}
func (c *StorageMinerStruct) SectorGetSealDelay(ctx context.Context) (time.Duration, error) {
return c.Internal.SectorGetSealDelay(ctx)
}
func (c *StorageMinerStruct) SectorsUpdate(ctx context.Context, id abi.SectorNumber, state api.SectorState) error {
return c.Internal.SectorsUpdate(ctx, id, state)
}

View File

@ -30,6 +30,7 @@ var sectorsCmd = &cli.Command{
sectorsRemoveCmd,
sectorsMarkForUpgradeCmd,
sectorsStartSealCmd,
sectorsSealDelayCmd,
},
}
@ -293,6 +294,32 @@ var sectorsStartSealCmd = &cli.Command{
},
}
var sectorsSealDelayCmd = &cli.Command{
Name: "set-seal-delay",
Usage: "Set the time, in minutes, that a new sector waits for deals before sealing starts",
ArgsUsage: "<minutes>",
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
if cctx.Args().Len() != 1 {
return xerrors.Errorf("must pass duration in minutes")
}
hs, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64)
if err != nil {
return xerrors.Errorf("could not parse sector number: %w", err)
}
delay := hs * uint64(time.Minute)
return nodeApi.SectorSetSealDelay(ctx, time.Duration(delay))
},
}
var sectorsUpdateCmd = &cli.Command{
Name: "update-state",
Usage: "ADVANCED: manually update the state of a sector, this may aid in error recovery",

2
go.mod
View File

@ -31,7 +31,7 @@ require (
github.com/filecoin-project/sector-storage v0.0.0-20200701092105-a2de752a3324
github.com/filecoin-project/specs-actors v0.7.1
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea
github.com/filecoin-project/storage-fsm v0.0.0-20200701221241-171e0d0e4bf9
github.com/filecoin-project/storage-fsm v0.0.0-20200707191927-b838ed4e859e
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
github.com/go-kit/kit v0.10.0
github.com/go-ole/go-ole v1.2.4 // indirect

4
go.sum
View File

@ -268,8 +268,8 @@ github.com/filecoin-project/specs-storage v0.1.0 h1:PkDgTOT5W5Ao7752onjDl4QSv+sg
github.com/filecoin-project/specs-storage v0.1.0/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea h1:iixjULRQFPn7Q9KlIqfwLJnlAXO10bbkI+xy5GKGdLY=
github.com/filecoin-project/specs-storage v0.1.1-0.20200622113353-88a9704877ea/go.mod h1:Pr5ntAaxsh+sLG/LYiL4tKzvA83Vk5vLODYhfNwOg7k=
github.com/filecoin-project/storage-fsm v0.0.0-20200701221241-171e0d0e4bf9 h1:X6TkCA+aT0TJxjL8S8agEVjqHBVgIe9WrvdHlYcNW3M=
github.com/filecoin-project/storage-fsm v0.0.0-20200701221241-171e0d0e4bf9/go.mod h1:SXO4VnXG056B/lXHL8HZv54eMqlsyynm+v93BlLwlOY=
github.com/filecoin-project/storage-fsm v0.0.0-20200707191927-b838ed4e859e h1:touwKs24SmZ1RCQzzW24jgbTHTKPKWF7evZTudQ4Z5g=
github.com/filecoin-project/storage-fsm v0.0.0-20200707191927-b838ed4e859e/go.mod h1:SXO4VnXG056B/lXHL8HZv54eMqlsyynm+v93BlLwlOY=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk=
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=

View File

@ -323,6 +323,8 @@ func Online() Option {
Override(new(dtypes.SetConsiderOfflineStorageDealsConfigFunc), modules.NewSetConsideringOfflineStorageDealsFunc),
Override(new(dtypes.ConsiderOfflineRetrievalDealsConfigFunc), modules.NewConsiderOfflineRetrievalDealsConfigFunc),
Override(new(dtypes.SetConsiderOfflineRetrievalDealsConfigFunc), modules.NewSetConsiderOfflineRetrievalDealsConfigFunc),
Override(new(dtypes.SetSealingDelayFunc), modules.NewSetSealDelayFunc),
Override(new(dtypes.GetSealingDelayFunc), modules.NewGetSealDelayFunc),
),
)
}

View File

@ -31,6 +31,7 @@ type StorageMiner struct {
Dealmaking DealmakingConfig
Storage sectorstorage.SealerConfig
SealingDelay Duration
}
type DealmakingConfig struct {
@ -132,6 +133,8 @@ func DefaultStorageMiner() *StorageMiner {
ConsiderOfflineRetrievalDeals: true,
PieceCidBlocklist: []cid.Cid{},
},
SealingDelay: Duration(time.Hour),
}
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"
cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345"

View File

@ -3,12 +3,12 @@ package impl
import (
"context"
"encoding/json"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"net/http"
"os"
"strconv"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"time"
"github.com/filecoin-project/go-address"
storagemarket "github.com/filecoin-project/go-fil-markets/storagemarket"
@ -53,6 +53,8 @@ type StorageMinerAPI struct {
SetConsiderOfflineStorageDealsConfigFunc dtypes.SetConsiderOfflineStorageDealsConfigFunc
ConsiderOfflineRetrievalDealsConfigFunc dtypes.ConsiderOfflineRetrievalDealsConfigFunc
SetConsiderOfflineRetrievalDealsConfigFunc dtypes.SetConsiderOfflineRetrievalDealsConfigFunc
SetSealingDelayFunc dtypes.SetSealingDelayFunc
GetSealingDelayFunc dtypes.GetSealingDelayFunc
}
func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
@ -181,6 +183,14 @@ func (sm *StorageMinerAPI) SectorStartSealing(ctx context.Context, number abi.Se
return sm.Miner.StartPackingSector(number)
}
func (sm *StorageMinerAPI) SectorSetSealDelay(ctx context.Context, delay time.Duration) error {
return sm.SetSealingDelayFunc(delay)
}
func (sm *StorageMinerAPI) SectorGetSealDelay(ctx context.Context) (time.Duration, error) {
return sm.GetSealingDelayFunc()
}
func (sm *StorageMinerAPI) SectorsUpdate(ctx context.Context, id abi.SectorNumber, state api.SectorState) error {
return sm.Miner.ForceSectorState(ctx, id, sealing.SectorState(state))
}

View File

@ -1,10 +1,10 @@
package dtypes
import (
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/ipfs/go-cid"
"time"
)
type MinerAddress address.Address
@ -50,3 +50,9 @@ type ConsiderOfflineRetrievalDealsConfigFunc func() (bool, error)
// SetConsiderOfflineRetrievalDealsConfigFunc is a function which is used to
// disable or enable retrieval deal acceptance.
type SetConsiderOfflineRetrievalDealsConfigFunc func(bool) error
// SetSealingDelay sets how long a sector waits for more deals before sealing begins.
type SetSealingDelayFunc func(time.Duration) error
// GetSealingDelay returns how long a sector waits for more deals before sealing begins.
type GetSealingDelayFunc func() (time.Duration, error)

View File

@ -4,8 +4,6 @@ import (
"context"
"errors"
"fmt"
"net/http"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-blockservice"
@ -22,6 +20,8 @@ import (
"go.uber.org/fx"
"go.uber.org/multierr"
"golang.org/x/xerrors"
"net/http"
"time"
"github.com/filecoin-project/go-address"
dtgraphsync "github.com/filecoin-project/go-data-transfer/impl/graphsync"
@ -134,7 +134,7 @@ func SectorIDCounter(ds dtypes.MetadataDS) sealing.SectorIDCounter {
return &sidsc{sc}
}
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier) (*storage.Miner, error) {
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*storage.Miner, error) {
maddr, err := minerAddrFromDS(ds)
if err != nil {
return nil, err
@ -157,7 +157,7 @@ func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h
return nil, err
}
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif)
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif, gsd)
if err != nil {
return nil, err
}
@ -525,6 +525,24 @@ func NewSetConsiderOfflineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.Se
}, nil
}
func NewSetSealDelayFunc(r repo.LockedRepo) (dtypes.SetSealingDelayFunc, error) {
return func(delay time.Duration) (err error) {
err = mutateCfg(r, func(cfg *config.StorageMiner) {
cfg.SealingDelay = config.Duration(delay)
})
return
}, nil
}
func NewGetSealDelayFunc(r repo.LockedRepo) (dtypes.GetSealingDelayFunc, error) {
return func() (out time.Duration, err error) {
err = readCfg(r, func(cfg *config.StorageMiner) {
out = time.Duration(cfg.SealingDelay)
})
return
}, nil
}
func readCfg(r repo.LockedRepo, accessor func(*config.StorageMiner)) error {
raw, err := r.Config()
if err != nil {

View File

@ -41,6 +41,7 @@ type Miner struct {
maddr address.Address
worker address.Address
getSealDelay dtypes.GetSealingDelayFunc
sealing *sealing.Sealing
}
@ -77,7 +78,7 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error)
}
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier) (*Miner, error) {
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*Miner, error) {
m := &Miner{
api: api,
h: h,
@ -88,6 +89,7 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d
maddr: maddr,
worker: worker,
getSealDelay: gsd,
}
return m, nil
@ -106,7 +108,7 @@ func (m *Miner) Run(ctx context.Context) error {
evts := events.NewEvents(ctx, m.api)
adaptedAPI := NewSealingAPIAdapter(m.api)
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, miner.MaxSectorExpirationExtension-(miner.WPoStProvingPeriod*2), md.PeriodStart%miner.WPoStProvingPeriod)
m.sealing = sealing.New(adaptedAPI, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp)
m.sealing = sealing.New(adaptedAPI, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingDelayFunc(m.getSealDelay))
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function