Merge pull request #8391 from filecoin-project/feat/parallel-fault-check

feat: storage: Parallel proving checks
This commit is contained in:
Łukasz Magiera 2022-03-29 16:36:12 -04:00 committed by GitHub
commit 3ce467a25f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 226 additions and 40 deletions

View File

@ -467,7 +467,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
}
stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{})
smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{
smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.Config{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,

View File

@ -196,7 +196,7 @@ var runCmd = &cli.Command{
&cli.IntFlag{
Name: "post-parallel-reads",
Usage: "maximum number of parallel challenge reads (0 = no limit)",
Value: 0,
Value: 128,
},
&cli.DurationFlag{
Name: "post-read-timeout",

View File

@ -50,7 +50,7 @@ OPTIONS:
--windowpost enable window post (default: false)
--winningpost enable winning post (default: false)
--parallel-fetch-limit value maximum fetch operations to run in parallel (default: 5)
--post-parallel-reads value maximum number of parallel challenge reads (0 = no limit) (default: 0)
--post-parallel-reads value maximum number of parallel challenge reads (0 = no limit) (default: 128)
--post-read-timeout value time limit for reading PoSt challenges (0 = no limit) (default: 0s)
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m")
--help, -h show help (default: false)

View File

@ -306,6 +306,14 @@
#PurgeCacheOnStart = false
[Proving]
# Maximum number of sector checks to run in parallel. (0 = unlimited)
#
# type: int
# env var: LOTUS_PROVING_PARALLELCHECKLIMIT
#ParallelCheckLimit = 128
[Sealing]
# Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time.
# If the miner is accepting multiple deals in parallel, up to MaxWaitDealsSectors of new sectors will be created.
@ -484,33 +492,49 @@
[Storage]
# type: int
# env var: LOTUS_STORAGE_PARALLELFETCHLIMIT
#ParallelFetchLimit = 10
# Local worker config
#
# type: bool
# env var: LOTUS_STORAGE_ALLOWADDPIECE
#AllowAddPiece = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWPRECOMMIT1
#AllowPreCommit1 = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWPRECOMMIT2
#AllowPreCommit2 = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWCOMMIT
#AllowCommit = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWUNSEAL
#AllowUnseal = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWREPLICAUPDATE
#AllowReplicaUpdate = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWPROVEREPLICAUPDATE2
#AllowProveReplicaUpdate2 = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY
#AllowRegenSectorKey = true
# ResourceFiltering instructs the system which resource filtering strategy
# to use when evaluating tasks against this worker. An empty value defaults
# to "hardware".
#
# type: sectorstorage.ResourceFilteringStrategy
# env var: LOTUS_STORAGE_RESOURCEFILTERING
#ResourceFiltering = "hardware"

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"fmt"
"sync"
"time"
"golang.org/x/xerrors"
@ -24,22 +25,55 @@ type FaultTracker interface {
// CheckProvable returns unprovable sectors
func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if rg == nil {
return nil, xerrors.Errorf("rg is nil")
}
var bad = make(map[abi.SectorID]string)
var badLk sync.Mutex
var postRand abi.PoStRandomness = make([]byte, abi.RandomnessLength)
_, _ = rand.Read(postRand)
postRand[31] &= 0x3f
limit := m.parallelCheckLimit
if limit <= 0 {
limit = len(sectors)
}
throttle := make(chan struct{}, limit)
addBad := func(s abi.SectorID, reason string) {
badLk.Lock()
bad[s] = reason
badLk.Unlock()
}
var wg sync.WaitGroup
wg.Add(len(sectors))
for _, sector := range sectors {
err := func() error {
select {
case throttle <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
}
go func(sector storage.SectorRef) {
defer wg.Done()
defer func() {
<-throttle
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
commr, update, err := rg(ctx, sector.ID)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: getting commR", "sector", sector, "sealed", "err", err)
bad[sector.ID] = fmt.Sprintf("getting commR: %s", err)
return nil
addBad(sector.ID, fmt.Sprintf("getting commR: %s", err))
return
}
toLock := storiface.FTSealed | storiface.FTCache
@ -49,31 +83,29 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
locked, err := m.index.StorageTryLock(ctx, sector.ID, toLock, storiface.FTNone)
if err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
addBad(sector.ID, fmt.Sprintf("tryLock error: %s", err))
return
}
if !locked {
log.Warnw("CheckProvable Sector FAULT: can't acquire read lock", "sector", sector)
bad[sector.ID] = fmt.Sprint("can't acquire read lock")
return nil
addBad(sector.ID, fmt.Sprint("can't acquire read lock"))
return
}
wpp, err := sector.ProofType.RegisteredWindowPoStProof()
if err != nil {
return err
addBad(sector.ID, fmt.Sprint("can't get proof type"))
return
}
var pr abi.PoStRandomness = make([]byte, abi.RandomnessLength)
_, _ = rand.Read(pr)
pr[31] &= 0x3f
ch, err := ffi.GeneratePoStFallbackSectorChallenges(wpp, sector.ID.Miner, pr, []abi.SectorNumber{
ch, err := ffi.GeneratePoStFallbackSectorChallenges(wpp, sector.ID.Miner, postRand, []abi.SectorNumber{
sector.ID.Number,
})
if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating challenges", "sector", sector, "err", err)
bad[sector.ID] = fmt.Sprintf("generating fallback challenges: %s", err)
return nil
addBad(sector.ID, fmt.Sprintf("generating fallback challenges: %s", err))
return
}
vctx, cancel2 := context.WithTimeout(ctx, PostCheckTimeout)
@ -88,17 +120,14 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
}, wpp)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating vanilla proof", "sector", sector, "err", err)
bad[sector.ID] = fmt.Sprintf("generating vanilla proof: %s", err)
return nil
addBad(sector.ID, fmt.Sprintf("generating vanilla proof: %s", err))
return
}
return nil
}()
if err != nil {
return nil, err
}
}(sector)
}
wg.Wait()
return bad, nil
}

View File

@ -70,6 +70,8 @@ type Manager struct {
workLk sync.Mutex
work *statestore.StateStore
parallelCheckLimit int
callToWork map[storiface.CallID]WorkID
// used when we get an early return and there's no callToWork mapping
callRes map[storiface.CallID]chan result
@ -99,7 +101,7 @@ const (
ResourceFilteringDisabled = ResourceFilteringStrategy("disabled")
)
type SealerConfig struct {
type Config struct {
ParallelFetchLimit int
// Local worker config
@ -116,6 +118,9 @@ type SealerConfig struct {
// to use when evaluating tasks against this worker. An empty value defaults
// to "hardware".
ResourceFiltering ResourceFilteringStrategy
// PoSt config
ParallelCheckLimit int
}
type StorageAuth http.Header
@ -123,7 +128,7 @@ type StorageAuth http.Header
type WorkerStateStore *statestore.StateStore
type ManagerStateStore *statestore.StateStore
func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc Config, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err)
@ -142,6 +147,8 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.
localProver: prover,
parallelCheckLimit: sc.ParallelCheckLimit,
work: mss,
callToWork: map[storiface.CallID]WorkID{},
callRes: map[storiface.CallID]chan result{},

View File

@ -30,7 +30,7 @@ import (
// only uses miner and does NOT use any remote worker.
func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
// Set up sector storage manager
sealerCfg := SealerConfig{
sealerCfg := Config{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,
@ -89,7 +89,7 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
// miner's worker can only add pieces to an unsealed sector.
sealerCfg := SealerConfig{
sealerCfg := Config{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: false,
@ -198,7 +198,7 @@ func generatePieceData(size uint64) []byte {
return bz
}
func newPieceProviderTestHarness(t *testing.T, mgrConfig SealerConfig, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness {
func newPieceProviderTestHarness(t *testing.T, mgrConfig Config, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness {
ctx := context.Background()
// listen on tcp socket to create an http server later
address := "0.0.0.0:0"

View File

@ -585,7 +585,7 @@ func (n *Ensemble) Start() *Ensemble {
// disable resource filtering so that local worker gets assigned tasks
// regardless of system pressure.
node.Override(new(sectorstorage.SealerConfig), func() sectorstorage.SealerConfig {
node.Override(new(sectorstorage.Config), func() sectorstorage.Config {
scfg := config.DefaultStorageMiner()
if noLocal {
@ -596,7 +596,7 @@ func (n *Ensemble) Start() *Ensemble {
}
scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled
return scfg.Storage
return scfg.StorageManager()
}),
// upgrades

View File

@ -214,7 +214,7 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)),
),
Override(new(sectorstorage.SealerConfig), cfg.Storage),
Override(new(sectorstorage.Config), cfg.StorageManager()),
Override(new(*storage.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
)
}

View File

@ -138,7 +138,11 @@ func DefaultStorageMiner() *StorageMiner {
TerminateBatchWait: Duration(5 * time.Minute),
},
Storage: sectorstorage.SealerConfig{
Proving: ProvingConfig{
ParallelCheckLimit: 128,
},
Storage: SealerConfig{
AllowAddPiece: true,
AllowPreCommit1: true,
AllowPreCommit2: true,

View File

@ -620,6 +620,14 @@ over the worker address if this flag is set.`,
Comment: ``,
},
},
"ProvingConfig": []DocField{
{
Name: "ParallelCheckLimit",
Type: "int",
Comment: `Maximum number of sector checks to run in parallel. (0 = unlimited)`,
},
},
"Pubsub": []DocField{
{
Name: "Bootstrapper",
@ -691,6 +699,70 @@ default value is true`,
This parameter is ONLY applicable if the retrieval pricing policy strategy has been configured to "external".`,
},
},
"SealerConfig": []DocField{
{
Name: "ParallelFetchLimit",
Type: "int",
Comment: ``,
},
{
Name: "AllowAddPiece",
Type: "bool",
Comment: `Local worker config`,
},
{
Name: "AllowPreCommit1",
Type: "bool",
Comment: ``,
},
{
Name: "AllowPreCommit2",
Type: "bool",
Comment: ``,
},
{
Name: "AllowCommit",
Type: "bool",
Comment: ``,
},
{
Name: "AllowUnseal",
Type: "bool",
Comment: ``,
},
{
Name: "AllowReplicaUpdate",
Type: "bool",
Comment: ``,
},
{
Name: "AllowProveReplicaUpdate2",
Type: "bool",
Comment: ``,
},
{
Name: "AllowRegenSectorKey",
Type: "bool",
Comment: ``,
},
{
Name: "ResourceFiltering",
Type: "sectorstorage.ResourceFilteringStrategy",
Comment: `ResourceFiltering instructs the system which resource filtering strategy
to use when evaluating tasks against this worker. An empty value defaults
to "hardware".`,
},
},
"SealingConfig": []DocField{
{
Name: "MaxWaitDealsSectors",
@ -933,6 +1005,12 @@ Default is 20 (about once a week).`,
Comment: ``,
},
{
Name: "Proving",
Type: "ProvingConfig",
Comment: ``,
},
{
Name: "Sealing",
Type: "SealingConfig",
@ -941,7 +1019,7 @@ Default is 20 (about once a week).`,
},
{
Name: "Storage",
Type: "sectorstorage.SealerConfig",
Type: "SealerConfig",
Comment: ``,
},

View File

@ -8,6 +8,7 @@ import (
"golang.org/x/xerrors"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
)
@ -49,3 +50,20 @@ func WriteStorageFile(path string, config stores.StorageConfig) error {
return nil
}
func (c *StorageMiner) StorageManager() sectorstorage.Config {
return sectorstorage.Config{
ParallelFetchLimit: c.Storage.ParallelFetchLimit,
AllowAddPiece: c.Storage.AllowAddPiece,
AllowPreCommit1: c.Storage.AllowPreCommit1,
AllowPreCommit2: c.Storage.AllowPreCommit2,
AllowCommit: c.Storage.AllowCommit,
AllowUnseal: c.Storage.AllowUnseal,
AllowReplicaUpdate: c.Storage.AllowReplicaUpdate,
AllowProveReplicaUpdate2: c.Storage.AllowProveReplicaUpdate2,
AllowRegenSectorKey: c.Storage.AllowRegenSectorKey,
ResourceFiltering: c.Storage.ResourceFiltering,
ParallelCheckLimit: c.Proving.ParallelCheckLimit,
}
}

View File

@ -1,10 +1,9 @@
package config
import (
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/ipfs/go-cid"
)
// // NOTE: ONLY PUT STRUCT DEFINITIONS IN THIS FILE
@ -53,8 +52,9 @@ type StorageMiner struct {
Subsystems MinerSubsystemConfig
Dealmaking DealmakingConfig
IndexProvider IndexProviderConfig
Proving ProvingConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig
Storage SealerConfig
Fees MinerFeeConfig
Addresses MinerAddressConfig
DAGStore DAGStoreConfig
@ -216,6 +216,13 @@ type RetrievalPricingDefault struct {
VerifiedDealsFreeTransfer bool
}
type ProvingConfig struct {
// Maximum number of sector checks to run in parallel. (0 = unlimited)
ParallelCheckLimit int
// todo disable builtin post
}
type SealingConfig struct {
// Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time.
// If the miner is accepting multiple deals in parallel, up to MaxWaitDealsSectors of new sectors will be created.
@ -307,6 +314,25 @@ type SealingConfig struct {
// todo TargetSectors - stop auto-pleding new sectors after this many sectors are sealed, default CC upgrade for deals sectors if above
}
type SealerConfig struct {
ParallelFetchLimit int
// Local worker config
AllowAddPiece bool
AllowPreCommit1 bool
AllowPreCommit2 bool
AllowCommit bool
AllowUnseal bool
AllowReplicaUpdate bool
AllowProveReplicaUpdate2 bool
AllowRegenSectorKey bool
// ResourceFiltering instructs the system which resource filtering strategy
// to use when evaluating tasks against this worker. An empty value defaults
// to "hardware".
ResourceFiltering sectorstorage.ResourceFilteringStrategy
}
type BatchFeeConfig struct {
Base types.FIL
PerSector types.FIL

View File

@ -739,11 +739,11 @@ func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStora
return stores.NewLocal(ctx, ls, si, urls)
}
func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.SealerConfig) *stores.Remote {
func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.Config) *stores.Remote {
return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{})
}
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.Config, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
ctx := helpers.LifecycleCtx(mctx, lc)
wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix))