feat: storage: Parallel proving checks

This commit is contained in:
Łukasz Magiera 2022-03-28 21:19:11 -04:00
parent d502eeba2b
commit ebd34f1884
12 changed files with 218 additions and 38 deletions

View File

@ -467,7 +467,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
} }
stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{}) stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{})
smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{ smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.Config{
ParallelFetchLimit: 10, ParallelFetchLimit: 10,
AllowAddPiece: true, AllowAddPiece: true,
AllowPreCommit1: true, AllowPreCommit1: true,

View File

@ -306,6 +306,14 @@
#PurgeCacheOnStart = false #PurgeCacheOnStart = false
[Proving]
# Maximum number of sector checks to run in parallel. (0 = unlimited)
#
# type: int
# env var: LOTUS_PROVING_PARALLELCHECKLIMIT
#ParallelCheckLimit = 0
[Sealing] [Sealing]
# Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time. # Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time.
# If the miner is accepting multiple deals in parallel, up to MaxWaitDealsSectors of new sectors will be created. # If the miner is accepting multiple deals in parallel, up to MaxWaitDealsSectors of new sectors will be created.
@ -484,33 +492,49 @@
[Storage] [Storage]
# type: int
# env var: LOTUS_STORAGE_PARALLELFETCHLIMIT # env var: LOTUS_STORAGE_PARALLELFETCHLIMIT
#ParallelFetchLimit = 10 #ParallelFetchLimit = 10
# Local worker config
#
# type: bool
# env var: LOTUS_STORAGE_ALLOWADDPIECE # env var: LOTUS_STORAGE_ALLOWADDPIECE
#AllowAddPiece = true #AllowAddPiece = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWPRECOMMIT1 # env var: LOTUS_STORAGE_ALLOWPRECOMMIT1
#AllowPreCommit1 = true #AllowPreCommit1 = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWPRECOMMIT2 # env var: LOTUS_STORAGE_ALLOWPRECOMMIT2
#AllowPreCommit2 = true #AllowPreCommit2 = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWCOMMIT # env var: LOTUS_STORAGE_ALLOWCOMMIT
#AllowCommit = true #AllowCommit = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWUNSEAL # env var: LOTUS_STORAGE_ALLOWUNSEAL
#AllowUnseal = true #AllowUnseal = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWREPLICAUPDATE # env var: LOTUS_STORAGE_ALLOWREPLICAUPDATE
#AllowReplicaUpdate = true #AllowReplicaUpdate = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWPROVEREPLICAUPDATE2 # env var: LOTUS_STORAGE_ALLOWPROVEREPLICAUPDATE2
#AllowProveReplicaUpdate2 = true #AllowProveReplicaUpdate2 = true
# type: bool
# env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY # env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY
#AllowRegenSectorKey = true #AllowRegenSectorKey = true
# ResourceFiltering instructs the system which resource filtering strategy
# to use when evaluating tasks against this worker. An empty value defaults
# to "hardware".
#
# type: sectorstorage.ResourceFilteringStrategy
# env var: LOTUS_STORAGE_RESOURCEFILTERING # env var: LOTUS_STORAGE_RESOURCEFILTERING
#ResourceFiltering = "hardware" #ResourceFiltering = "hardware"

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"sync"
"time" "time"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -24,22 +25,55 @@ type FaultTracker interface {
// CheckProvable returns unprovable sectors // CheckProvable returns unprovable sectors
func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) { func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if rg == nil { if rg == nil {
return nil, xerrors.Errorf("rg is nil") return nil, xerrors.Errorf("rg is nil")
} }
var bad = make(map[abi.SectorID]string) var bad = make(map[abi.SectorID]string)
var badLk sync.Mutex
var postRand abi.PoStRandomness = make([]byte, abi.RandomnessLength)
_, _ = rand.Read(postRand)
postRand[31] &= 0x3f
limit := m.parallelCheckLimit
if limit <= 0 {
limit = len(sectors)
}
throttle := make(chan struct{}, limit)
addBad := func(s abi.SectorID, reason string) {
badLk.Lock()
bad[s] = reason
badLk.Unlock()
}
var wg sync.WaitGroup
wg.Add(len(sectors))
for _, sector := range sectors { for _, sector := range sectors {
err := func() error { select {
case throttle <- struct{}{}:
case <-ctx.Done():
return nil, ctx.Err()
}
go func(sector storage.SectorRef) {
defer wg.Done()
defer func() {
<-throttle
}()
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
commr, update, err := rg(ctx, sector.ID) commr, update, err := rg(ctx, sector.ID)
if err != nil { if err != nil {
log.Warnw("CheckProvable Sector FAULT: getting commR", "sector", sector, "sealed", "err", err) log.Warnw("CheckProvable Sector FAULT: getting commR", "sector", sector, "sealed", "err", err)
bad[sector.ID] = fmt.Sprintf("getting commR: %s", err) addBad(sector.ID, fmt.Sprintf("getting commR: %s", err))
return nil return
} }
toLock := storiface.FTSealed | storiface.FTCache toLock := storiface.FTSealed | storiface.FTCache
@ -49,31 +83,29 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
locked, err := m.index.StorageTryLock(ctx, sector.ID, toLock, storiface.FTNone) locked, err := m.index.StorageTryLock(ctx, sector.ID, toLock, storiface.FTNone)
if err != nil { if err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err) addBad(sector.ID, fmt.Sprintf("tryLock error: %s", err))
return
} }
if !locked { if !locked {
log.Warnw("CheckProvable Sector FAULT: can't acquire read lock", "sector", sector) log.Warnw("CheckProvable Sector FAULT: can't acquire read lock", "sector", sector)
bad[sector.ID] = fmt.Sprint("can't acquire read lock") addBad(sector.ID, fmt.Sprint("can't acquire read lock"))
return nil return
} }
wpp, err := sector.ProofType.RegisteredWindowPoStProof() wpp, err := sector.ProofType.RegisteredWindowPoStProof()
if err != nil { if err != nil {
return err addBad(sector.ID, fmt.Sprint("can't get proof type"))
return
} }
var pr abi.PoStRandomness = make([]byte, abi.RandomnessLength) ch, err := ffi.GeneratePoStFallbackSectorChallenges(wpp, sector.ID.Miner, postRand, []abi.SectorNumber{
_, _ = rand.Read(pr)
pr[31] &= 0x3f
ch, err := ffi.GeneratePoStFallbackSectorChallenges(wpp, sector.ID.Miner, pr, []abi.SectorNumber{
sector.ID.Number, sector.ID.Number,
}) })
if err != nil { if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating challenges", "sector", sector, "err", err) log.Warnw("CheckProvable Sector FAULT: generating challenges", "sector", sector, "err", err)
bad[sector.ID] = fmt.Sprintf("generating fallback challenges: %s", err) addBad(sector.ID, fmt.Sprintf("generating fallback challenges: %s", err))
return nil return
} }
vctx, cancel2 := context.WithTimeout(ctx, PostCheckTimeout) vctx, cancel2 := context.WithTimeout(ctx, PostCheckTimeout)
@ -88,15 +120,10 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
}, wpp) }, wpp)
if err != nil { if err != nil {
log.Warnw("CheckProvable Sector FAULT: generating vanilla proof", "sector", sector, "err", err) log.Warnw("CheckProvable Sector FAULT: generating vanilla proof", "sector", sector, "err", err)
bad[sector.ID] = fmt.Sprintf("generating vanilla proof: %s", err) addBad(sector.ID, fmt.Sprintf("generating vanilla proof: %s", err))
return nil return
} }
}(sector)
return nil
}()
if err != nil {
return nil, err
}
} }
return bad, nil return bad, nil

View File

@ -70,6 +70,8 @@ type Manager struct {
workLk sync.Mutex workLk sync.Mutex
work *statestore.StateStore work *statestore.StateStore
parallelCheckLimit int
callToWork map[storiface.CallID]WorkID callToWork map[storiface.CallID]WorkID
// used when we get an early return and there's no callToWork mapping // used when we get an early return and there's no callToWork mapping
callRes map[storiface.CallID]chan result callRes map[storiface.CallID]chan result
@ -99,7 +101,7 @@ const (
ResourceFilteringDisabled = ResourceFilteringStrategy("disabled") ResourceFilteringDisabled = ResourceFilteringStrategy("disabled")
) )
type SealerConfig struct { type Config struct {
ParallelFetchLimit int ParallelFetchLimit int
// Local worker config // Local worker config
@ -116,6 +118,9 @@ type SealerConfig struct {
// to use when evaluating tasks against this worker. An empty value defaults // to use when evaluating tasks against this worker. An empty value defaults
// to "hardware". // to "hardware".
ResourceFiltering ResourceFilteringStrategy ResourceFiltering ResourceFilteringStrategy
// PoSt config
ParallelCheckLimit int
} }
type StorageAuth http.Header type StorageAuth http.Header
@ -123,7 +128,7 @@ type StorageAuth http.Header
type WorkerStateStore *statestore.StateStore type WorkerStateStore *statestore.StateStore
type ManagerStateStore *statestore.StateStore type ManagerStateStore *statestore.StateStore
func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) { func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc Config, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si}) prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
if err != nil { if err != nil {
return nil, xerrors.Errorf("creating prover instance: %w", err) return nil, xerrors.Errorf("creating prover instance: %w", err)
@ -142,6 +147,8 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.
localProver: prover, localProver: prover,
parallelCheckLimit: sc.ParallelCheckLimit,
work: mss, work: mss,
callToWork: map[storiface.CallID]WorkID{}, callToWork: map[storiface.CallID]WorkID{},
callRes: map[storiface.CallID]chan result{}, callRes: map[storiface.CallID]chan result{},

View File

@ -30,7 +30,7 @@ import (
// only uses miner and does NOT use any remote worker. // only uses miner and does NOT use any remote worker.
func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) { func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
// Set up sector storage manager // Set up sector storage manager
sealerCfg := SealerConfig{ sealerCfg := Config{
ParallelFetchLimit: 10, ParallelFetchLimit: 10,
AllowAddPiece: true, AllowAddPiece: true,
AllowPreCommit1: true, AllowPreCommit1: true,
@ -89,7 +89,7 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug) logging.SetAllLoggers(logging.LevelDebug)
// miner's worker can only add pieces to an unsealed sector. // miner's worker can only add pieces to an unsealed sector.
sealerCfg := SealerConfig{ sealerCfg := Config{
ParallelFetchLimit: 10, ParallelFetchLimit: 10,
AllowAddPiece: true, AllowAddPiece: true,
AllowPreCommit1: false, AllowPreCommit1: false,
@ -198,7 +198,7 @@ func generatePieceData(size uint64) []byte {
return bz return bz
} }
func newPieceProviderTestHarness(t *testing.T, mgrConfig SealerConfig, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness { func newPieceProviderTestHarness(t *testing.T, mgrConfig Config, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness {
ctx := context.Background() ctx := context.Background()
// listen on tcp socket to create an http server later // listen on tcp socket to create an http server later
address := "0.0.0.0:0" address := "0.0.0.0:0"

View File

@ -585,7 +585,7 @@ func (n *Ensemble) Start() *Ensemble {
// disable resource filtering so that local worker gets assigned tasks // disable resource filtering so that local worker gets assigned tasks
// regardless of system pressure. // regardless of system pressure.
node.Override(new(sectorstorage.SealerConfig), func() sectorstorage.SealerConfig { node.Override(new(sectorstorage.Config), func() sectorstorage.Config {
scfg := config.DefaultStorageMiner() scfg := config.DefaultStorageMiner()
if noLocal { if noLocal {
@ -596,7 +596,7 @@ func (n *Ensemble) Start() *Ensemble {
} }
scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled
return scfg.Storage return scfg.StorageManager()
}), }),
// upgrades // upgrades

View File

@ -214,7 +214,7 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees, &cfg.Dealmaking)),
), ),
Override(new(sectorstorage.SealerConfig), cfg.Storage), Override(new(sectorstorage.Config), cfg.StorageManager()),
Override(new(*storage.AddressSelector), modules.AddressSelector(&cfg.Addresses)), Override(new(*storage.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
) )
} }

View File

@ -138,7 +138,7 @@ func DefaultStorageMiner() *StorageMiner {
TerminateBatchWait: Duration(5 * time.Minute), TerminateBatchWait: Duration(5 * time.Minute),
}, },
Storage: sectorstorage.SealerConfig{ Storage: SealerConfig{
AllowAddPiece: true, AllowAddPiece: true,
AllowPreCommit1: true, AllowPreCommit1: true,
AllowPreCommit2: true, AllowPreCommit2: true,

View File

@ -620,6 +620,14 @@ over the worker address if this flag is set.`,
Comment: ``, Comment: ``,
}, },
}, },
"ProvingConfig": []DocField{
{
Name: "ParallelCheckLimit",
Type: "int",
Comment: `Maximum number of sector checks to run in parallel. (0 = unlimited)`,
},
},
"Pubsub": []DocField{ "Pubsub": []DocField{
{ {
Name: "Bootstrapper", Name: "Bootstrapper",
@ -691,6 +699,70 @@ default value is true`,
This parameter is ONLY applicable if the retrieval pricing policy strategy has been configured to "external".`, This parameter is ONLY applicable if the retrieval pricing policy strategy has been configured to "external".`,
}, },
}, },
"SealerConfig": []DocField{
{
Name: "ParallelFetchLimit",
Type: "int",
Comment: ``,
},
{
Name: "AllowAddPiece",
Type: "bool",
Comment: `Local worker config`,
},
{
Name: "AllowPreCommit1",
Type: "bool",
Comment: ``,
},
{
Name: "AllowPreCommit2",
Type: "bool",
Comment: ``,
},
{
Name: "AllowCommit",
Type: "bool",
Comment: ``,
},
{
Name: "AllowUnseal",
Type: "bool",
Comment: ``,
},
{
Name: "AllowReplicaUpdate",
Type: "bool",
Comment: ``,
},
{
Name: "AllowProveReplicaUpdate2",
Type: "bool",
Comment: ``,
},
{
Name: "AllowRegenSectorKey",
Type: "bool",
Comment: ``,
},
{
Name: "ResourceFiltering",
Type: "sectorstorage.ResourceFilteringStrategy",
Comment: `ResourceFiltering instructs the system which resource filtering strategy
to use when evaluating tasks against this worker. An empty value defaults
to "hardware".`,
},
},
"SealingConfig": []DocField{ "SealingConfig": []DocField{
{ {
Name: "MaxWaitDealsSectors", Name: "MaxWaitDealsSectors",
@ -933,6 +1005,12 @@ Default is 20 (about once a week).`,
Comment: ``, Comment: ``,
}, },
{
Name: "Proving",
Type: "ProvingConfig",
Comment: ``,
},
{ {
Name: "Sealing", Name: "Sealing",
Type: "SealingConfig", Type: "SealingConfig",
@ -941,7 +1019,7 @@ Default is 20 (about once a week).`,
}, },
{ {
Name: "Storage", Name: "Storage",
Type: "sectorstorage.SealerConfig", Type: "SealerConfig",
Comment: ``, Comment: ``,
}, },

View File

@ -8,6 +8,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
) )
@ -49,3 +50,20 @@ func WriteStorageFile(path string, config stores.StorageConfig) error {
return nil return nil
} }
func (c *StorageMiner) StorageManager() sectorstorage.Config {
return sectorstorage.Config{
ParallelFetchLimit: c.Storage.ParallelFetchLimit,
AllowAddPiece: c.Storage.AllowAddPiece,
AllowPreCommit1: c.Storage.AllowPreCommit1,
AllowPreCommit2: c.Storage.AllowPreCommit2,
AllowCommit: c.Storage.AllowCommit,
AllowUnseal: c.Storage.AllowUnseal,
AllowReplicaUpdate: c.Storage.AllowReplicaUpdate,
AllowProveReplicaUpdate2: c.Storage.AllowProveReplicaUpdate2,
AllowRegenSectorKey: c.Storage.AllowRegenSectorKey,
ResourceFiltering: c.Storage.ResourceFiltering,
ParallelCheckLimit: c.Proving.ParallelCheckLimit,
}
}

View File

@ -1,10 +1,9 @@
package config package config
import ( import (
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/ipfs/go-cid"
) )
// // NOTE: ONLY PUT STRUCT DEFINITIONS IN THIS FILE // // NOTE: ONLY PUT STRUCT DEFINITIONS IN THIS FILE
@ -53,8 +52,9 @@ type StorageMiner struct {
Subsystems MinerSubsystemConfig Subsystems MinerSubsystemConfig
Dealmaking DealmakingConfig Dealmaking DealmakingConfig
IndexProvider IndexProviderConfig IndexProvider IndexProviderConfig
Proving ProvingConfig
Sealing SealingConfig Sealing SealingConfig
Storage sectorstorage.SealerConfig Storage SealerConfig
Fees MinerFeeConfig Fees MinerFeeConfig
Addresses MinerAddressConfig Addresses MinerAddressConfig
DAGStore DAGStoreConfig DAGStore DAGStoreConfig
@ -216,6 +216,13 @@ type RetrievalPricingDefault struct {
VerifiedDealsFreeTransfer bool VerifiedDealsFreeTransfer bool
} }
type ProvingConfig struct {
// Maximum number of sector checks to run in parallel. (0 = unlimited)
ParallelCheckLimit int
// todo disable builtin post
}
type SealingConfig struct { type SealingConfig struct {
// Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time. // Upper bound on how many sectors can be waiting for more deals to be packed in it before it begins sealing at any given time.
// If the miner is accepting multiple deals in parallel, up to MaxWaitDealsSectors of new sectors will be created. // If the miner is accepting multiple deals in parallel, up to MaxWaitDealsSectors of new sectors will be created.
@ -307,6 +314,25 @@ type SealingConfig struct {
// todo TargetSectors - stop auto-pleding new sectors after this many sectors are sealed, default CC upgrade for deals sectors if above // todo TargetSectors - stop auto-pleding new sectors after this many sectors are sealed, default CC upgrade for deals sectors if above
} }
type SealerConfig struct {
ParallelFetchLimit int
// Local worker config
AllowAddPiece bool
AllowPreCommit1 bool
AllowPreCommit2 bool
AllowCommit bool
AllowUnseal bool
AllowReplicaUpdate bool
AllowProveReplicaUpdate2 bool
AllowRegenSectorKey bool
// ResourceFiltering instructs the system which resource filtering strategy
// to use when evaluating tasks against this worker. An empty value defaults
// to "hardware".
ResourceFiltering sectorstorage.ResourceFilteringStrategy
}
type BatchFeeConfig struct { type BatchFeeConfig struct {
Base types.FIL Base types.FIL
PerSector types.FIL PerSector types.FIL

View File

@ -739,11 +739,11 @@ func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStora
return stores.NewLocal(ctx, ls, si, urls) return stores.NewLocal(ctx, ls, si, urls)
} }
func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.SealerConfig) *stores.Remote { func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.Config) *stores.Remote {
return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{}) return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{})
} }
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.SealerConfig, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) { func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc sectorstorage.Config, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
ctx := helpers.LifecycleCtx(mctx, lc) ctx := helpers.LifecycleCtx(mctx, lc)
wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix)) wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix))