dagstore: throttle calls to storage except actual unseals.
This commit is contained in:
parent
db6b94cf1e
commit
650d6efb1a
@ -2,7 +2,10 @@ package dagstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"github.com/filecoin-project/dagstore/throttle"
|
||||
"github.com/ipfs/go-cid"
|
||||
@ -13,10 +16,19 @@ import (
|
||||
"github.com/filecoin-project/go-fil-markets/shared"
|
||||
)
|
||||
|
||||
// MaxConcurrentUnsealedFetches caps the amount of concurrent fetches for
|
||||
// unsealed pieces, so that we don't saturate IO or the network too much,
|
||||
// especially when bulk processing (e.g. at migration).
|
||||
var MaxConcurrentUnsealedFetches = 3
|
||||
// MaxConcurrentStorageCalls caps the amount of concurrent calls to the
|
||||
// storage, so that we don't spam it during heavy processes like bulk migration.
|
||||
var MaxConcurrentStorageCalls = func() int {
|
||||
// TODO replace env with config.toml attribute.
|
||||
v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY")
|
||||
if ok {
|
||||
concurrency, err := strconv.Atoi(v)
|
||||
if err == nil {
|
||||
return concurrency
|
||||
}
|
||||
}
|
||||
return 100
|
||||
}()
|
||||
|
||||
type LotusAccessor interface {
|
||||
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
|
||||
@ -37,7 +49,7 @@ func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalP
|
||||
return &lotusAccessor{
|
||||
pieceStore: store,
|
||||
rm: rm,
|
||||
throttle: throttle.Fixed(MaxConcurrentUnsealedFetches),
|
||||
throttle: throttle.Fixed(MaxConcurrentStorageCalls),
|
||||
readyMgr: shared.NewReadyManager(),
|
||||
}
|
||||
}
|
||||
@ -74,34 +86,48 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
|
||||
// Throttle this path to avoid flooding the storage subsystem.
|
||||
var pieceInfo piecestore.PieceInfo
|
||||
err = m.throttle.Do(ctx, func(ctx context.Context) (err error) {
|
||||
pieceInfo, err = m.pieceStore.GetPieceInfo(pieceCid)
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
|
||||
}
|
||||
|
||||
if len(pieceInfo.Deals) == 0 {
|
||||
return nil, xerrors.Errorf("no storage deals found for Piece %s", pieceCid)
|
||||
return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
|
||||
}
|
||||
|
||||
// prefer an unsealed sector containing the piece if one exists
|
||||
for _, deal := range pieceInfo.Deals {
|
||||
isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
||||
deal := deal
|
||||
|
||||
// Throttle this path to avoid flooding the storage subsystem.
|
||||
var reader io.ReadCloser
|
||||
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
|
||||
isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorID, deal.DealID, err)
|
||||
}
|
||||
if !isUnsealed {
|
||||
return nil
|
||||
}
|
||||
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
|
||||
reader, err = m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Warnf("failed to check if deal %d unsealed: %s", deal.DealID, err)
|
||||
continue
|
||||
log.Warnf("failed to check/retrieve unsealed sector: %s", err)
|
||||
continue // move on to the next match.
|
||||
}
|
||||
if isUnsealed {
|
||||
var reader io.ReadCloser
|
||||
// We want to throttle this path, as these copies will be downloaded
|
||||
// immediately from the storage cluster without any unsealing
|
||||
// necessary.
|
||||
deal := deal // calm the linter
|
||||
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
|
||||
// UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around.
|
||||
reader, err = m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
||||
return err
|
||||
})
|
||||
return reader, err
|
||||
|
||||
if reader != nil {
|
||||
// we were able to obtain a reader for an already unsealed piece
|
||||
return reader, nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -110,6 +136,8 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid
|
||||
for _, deal := range pieceInfo.Deals {
|
||||
// Note that if the deal data is not already unsealed, unsealing may
|
||||
// block for a long time with the current PoRep
|
||||
//
|
||||
// This path is unthrottled.
|
||||
reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
||||
if err != nil {
|
||||
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
|
||||
|
@ -123,6 +123,8 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestThrottle(t *testing.T) {
|
||||
MaxConcurrentStorageCalls = 3
|
||||
|
||||
ctx := context.Background()
|
||||
cid1, err := cid.Parse("bafkqaaa")
|
||||
require.NoError(t, err)
|
||||
@ -160,7 +162,7 @@ func TestThrottle(t *testing.T) {
|
||||
}
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
require.EqualValues(t, MaxConcurrentUnsealedFetches, atomic.LoadInt32(&rpn.calls)) // throttled
|
||||
require.EqualValues(t, MaxConcurrentStorageCalls, atomic.LoadInt32(&rpn.calls)) // throttled
|
||||
|
||||
// allow to proceed.
|
||||
rpn.lk.Unlock()
|
||||
|
Loading…
Reference in New Issue
Block a user