feat: #7747 sealing: Adding conf variable for capping number of concurrent unsealing jobs (#7884)

* adding the new variables- now time for logic

* putting parameters into right placeS

* adding unsealing throttle

* fixing linter issues

* removing one last thing...
This commit is contained in:
c r 2022-01-13 12:26:13 -06:00 committed by GitHub
parent 60fae3a59d
commit da6752eccb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 53 additions and 17 deletions

View File

@ -550,6 +550,14 @@
# env var: LOTUS_DAGSTORE_MAXCONCURRENTREADYFETCHES # env var: LOTUS_DAGSTORE_MAXCONCURRENTREADYFETCHES
#MaxConcurrentReadyFetches = 0 #MaxConcurrentReadyFetches = 0
# The maximum amount of unseals that can be processed simultaneously
# from the storage subsystem. 0 means unlimited.
# Default value: 0 (unlimited).
#
# type: int
# env var: LOTUS_DAGSTORE_MAXCONCURRENTUNSEALS
#MaxConcurrentUnseals = 5
# The maximum number of simultaneous inflight API calls to the storage # The maximum number of simultaneous inflight API calls to the storage
# subsystem. # subsystem.
# Default value: 100. # Default value: 100.

View File

@ -34,16 +34,24 @@ type minerAPI struct {
pieceStore piecestore.PieceStore pieceStore piecestore.PieceStore
sa SectorAccessor sa SectorAccessor
throttle throttle.Throttler throttle throttle.Throttler
unsealThrottle throttle.Throttler
readyMgr *shared.ReadyManager readyMgr *shared.ReadyManager
} }
var _ MinerAPI = (*minerAPI)(nil) var _ MinerAPI = (*minerAPI)(nil)
func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int) MinerAPI { func NewMinerAPI(store piecestore.PieceStore, sa SectorAccessor, concurrency int, unsealConcurrency int) MinerAPI {
var unsealThrottle throttle.Throttler
if unsealConcurrency == 0 {
unsealThrottle = throttle.Noop()
} else {
unsealThrottle = throttle.Fixed(unsealConcurrency)
}
return &minerAPI{ return &minerAPI{
pieceStore: store, pieceStore: store,
sa: sa, sa: sa,
throttle: throttle.Fixed(concurrency), throttle: throttle.Fixed(concurrency),
unsealThrottle: unsealThrottle,
readyMgr: shared.NewReadyManager(), readyMgr: shared.NewReadyManager(),
} }
} }
@ -152,13 +160,19 @@ func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (mo
} }
lastErr := xerrors.New("no sectors found to unseal from") lastErr := xerrors.New("no sectors found to unseal from")
// if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal. // if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal.
for _, deal := range pieceInfo.Deals { for _, deal := range pieceInfo.Deals {
// Note that if the deal data is not already unsealed, unsealing may // Note that if the deal data is not already unsealed, unsealing may
// block for a long time with the current PoRep // block for a long time with the current PoRep
// var reader mount.Reader
// This path is unthrottled. deal := deal
reader, err := m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
reader, err = m.sa.UnsealSectorAt(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
return err
})
if err != nil { if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err) lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error()) log.Warn(lastErr.Error())

View File

@ -75,7 +75,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
rpn := &mockRPN{ rpn := &mockRPN{
sectors: mockData, sectors: mockData,
} }
api := NewMinerAPI(ps, rpn, 100) api := NewMinerAPI(ps, rpn, 100, 5)
require.NoError(t, api.Start(ctx)) require.NoError(t, api.Start(ctx))
// Add deals to piece store // Add deals to piece store
@ -115,7 +115,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
ps := getPieceStore(t) ps := getPieceStore(t)
rpn := &mockRPN{} rpn := &mockRPN{}
api := NewMinerAPI(ps, rpn, 100) api := NewMinerAPI(ps, rpn, 100, 5)
require.NoError(t, api.Start(ctx)) require.NoError(t, api.Start(ctx))
// Add a deal with data Length 10 // Add a deal with data Length 10
@ -142,7 +142,7 @@ func TestThrottle(t *testing.T) {
unsealedSectorID: "foo", unsealedSectorID: "foo",
}, },
} }
api := NewMinerAPI(ps, rpn, 3) api := NewMinerAPI(ps, rpn, 3, 5)
require.NoError(t, api.Start(ctx)) require.NoError(t, api.Start(ctx))
// Add a deal with data Length 10 // Add a deal with data Length 10

View File

@ -96,7 +96,7 @@ func TestShardRegistration(t *testing.T) {
cfg := config.DefaultStorageMiner().DAGStore cfg := config.DefaultStorageMiner().DAGStore
cfg.RootDir = t.TempDir() cfg.RootDir = t.TempDir()
mapi := NewMinerAPI(ps, &wrappedSA{sa}, 10) mapi := NewMinerAPI(ps, &wrappedSA{sa}, 10, 5)
dagst, w, err := NewDAGStore(cfg, mapi) dagst, w, err := NewDAGStore(cfg, mapi)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, dagst) require.NotNil(t, dagst)

View File

@ -216,6 +216,7 @@ func DefaultStorageMiner() *StorageMiner {
DAGStore: DAGStoreConfig{ DAGStore: DAGStoreConfig{
MaxConcurrentIndex: 5, MaxConcurrentIndex: 5,
MaxConcurrencyStorageCalls: 100, MaxConcurrencyStorageCalls: 100,
MaxConcurrentUnseals: 5,
GCInterval: Duration(1 * time.Minute), GCInterval: Duration(1 * time.Minute),
}, },
} }

View File

@ -162,6 +162,14 @@ Default value: 5.`,
Comment: `The maximum amount of unsealed deals that can be fetched simultaneously Comment: `The maximum amount of unsealed deals that can be fetched simultaneously
from the storage subsystem. 0 means unlimited. from the storage subsystem. 0 means unlimited.
Default value: 0 (unlimited).`,
},
{
Name: "MaxConcurrentUnseals",
Type: "int",
Comment: `The maximum amount of unseals that can be processed simultaneously
from the storage subsystem. 0 means unlimited.
Default value: 0 (unlimited).`, Default value: 0 (unlimited).`,
}, },
{ {

View File

@ -75,6 +75,11 @@ type DAGStoreConfig struct {
// Default value: 0 (unlimited). // Default value: 0 (unlimited).
MaxConcurrentReadyFetches int MaxConcurrentReadyFetches int
// The maximum amount of unseals that can be processed simultaneously
// from the storage subsystem. 0 means unlimited.
// Default value: 0 (unlimited).
MaxConcurrentUnseals int
// The maximum number of simultaneous inflight API calls to the storage // The maximum number of simultaneous inflight API calls to the storage
// subsystem. // subsystem.
// Default value: 100. // Default value: 100.

View File

@ -38,7 +38,7 @@ func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderP
} }
} }
mountApi := mdagstore.NewMinerAPI(pieceStore, sa, cfg.MaxConcurrencyStorageCalls) mountApi := mdagstore.NewMinerAPI(pieceStore, sa, cfg.MaxConcurrencyStorageCalls, cfg.MaxConcurrentUnseals)
ready := make(chan error, 1) ready := make(chan error, 1)
pieceStore.OnReady(func(err error) { pieceStore.OnReady(func(err error) {
ready <- err ready <- err