Merge pull request #6908 from filecoin-project/fix/dagstore-throttle

dagstore: throttle calls to storage except actual unseals.
This commit is contained in:
raulk 2021-07-28 16:44:04 +01:00 committed by GitHub
commit 80fcad5ad5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 71 additions and 37 deletions

2
go.mod
View File

@ -26,7 +26,7 @@ require (
github.com/elastic/gosigar v0.12.0 github.com/elastic/gosigar v0.12.0
github.com/etclabscore/go-openrpc-reflect v0.0.36 github.com/etclabscore/go-openrpc-reflect v0.0.36
github.com/fatih/color v1.9.0 github.com/fatih/color v1.9.0
github.com/filecoin-project/dagstore v0.3.1 github.com/filecoin-project/dagstore v0.3.2-0.20210728152352-8e0b04b9b384
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
github.com/filecoin-project/go-address v0.0.5 github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-bitfield v0.2.4 github.com/filecoin-project/go-bitfield v0.2.4

3
go.sum
View File

@ -256,8 +256,9 @@ github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGj
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E= github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
github.com/filecoin-project/dagstore v0.3.1 h1:IShahbR3zEPE1pfHa5DiYMt/S40b9JuWZx6+kZPuyTQ=
github.com/filecoin-project/dagstore v0.3.1/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= github.com/filecoin-project/dagstore v0.3.1/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY=
github.com/filecoin-project/dagstore v0.3.2-0.20210728152352-8e0b04b9b384 h1:0LrfJHR6YAa8t7VEZmbNECW7Deov8Di9CZHsZaamiM4=
github.com/filecoin-project/dagstore v0.3.2-0.20210728152352-8e0b04b9b384/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY=
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM= github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM=
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=

View File

@ -2,7 +2,10 @@ package dagstore
import ( import (
"context" "context"
"fmt"
"io" "io"
"os"
"strconv"
"github.com/filecoin-project/dagstore/throttle" "github.com/filecoin-project/dagstore/throttle"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -13,10 +16,19 @@ import (
"github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared"
) )
// MaxConcurrentUnsealedFetches caps the amount of concurrent fetches for // MaxConcurrentStorageCalls caps the amount of concurrent calls to the
// unsealed pieces, so that we don't saturate IO or the network too much, // storage, so that we don't spam it during heavy processes like bulk migration.
// especially when bulk processing (e.g. at migration). var MaxConcurrentStorageCalls = func() int {
var MaxConcurrentUnsealedFetches = 3 // TODO replace env with config.toml attribute.
v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY")
if ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
return concurrency
}
}
return 100
}()
type LotusAccessor interface { type LotusAccessor interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
@ -37,7 +49,7 @@ func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalP
return &lotusAccessor{ return &lotusAccessor{
pieceStore: store, pieceStore: store,
rm: rm, rm: rm,
throttle: throttle.Fixed(MaxConcurrentUnsealedFetches), throttle: throttle.Fixed(MaxConcurrentStorageCalls),
readyMgr: shared.NewReadyManager(), readyMgr: shared.NewReadyManager(),
} }
} }
@ -74,34 +86,48 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid
return nil, err return nil, err
} }
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid) // Throttle this path to avoid flooding the storage subsystem.
var pieceInfo piecestore.PieceInfo
err = m.throttle.Do(ctx, func(ctx context.Context) (err error) {
pieceInfo, err = m.pieceStore.GetPieceInfo(pieceCid)
return err
})
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
} }
if len(pieceInfo.Deals) == 0 { if len(pieceInfo.Deals) == 0 {
return nil, xerrors.Errorf("no storage deals found for Piece %s", pieceCid) return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
} }
// prefer an unsealed sector containing the piece if one exists // prefer an unsealed sector containing the piece if one exists
for _, deal := range pieceInfo.Deals { for _, deal := range pieceInfo.Deals {
isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) deal := deal
// Throttle this path to avoid flooding the storage subsystem.
var reader io.ReadCloser
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil {
return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorID, deal.DealID, err)
}
if !isUnsealed {
return nil
}
// Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing.
reader, err = m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
return err
})
if err != nil { if err != nil {
log.Warnf("failed to check if deal %d unsealed: %s", deal.DealID, err) log.Warnf("failed to check/retrieve unsealed sector: %s", err)
continue continue // move on to the next match.
} }
if isUnsealed {
var reader io.ReadCloser if reader != nil {
// We want to throttle this path, as these copies will be downloaded // we were able to obtain a reader for an already unsealed piece
// immediately from the storage cluster without any unsealing return reader, nil
// necessary.
deal := deal // calm the linter
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
// UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around.
reader, err = m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
return err
})
return reader, err
} }
} }
@ -110,6 +136,8 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid
for _, deal := range pieceInfo.Deals { for _, deal := range pieceInfo.Deals {
// Note that if the deal data is not already unsealed, unsealing may // Note that if the deal data is not already unsealed, unsealing may
// block for a long time with the current PoRep // block for a long time with the current PoRep
//
// This path is unthrottled.
reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil { if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err) lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)

View File

@ -123,6 +123,8 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
} }
func TestThrottle(t *testing.T) { func TestThrottle(t *testing.T) {
MaxConcurrentStorageCalls = 3
ctx := context.Background() ctx := context.Background()
cid1, err := cid.Parse("bafkqaaa") cid1, err := cid.Parse("bafkqaaa")
require.NoError(t, err) require.NoError(t, err)
@ -160,7 +162,7 @@ func TestThrottle(t *testing.T) {
} }
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
require.EqualValues(t, MaxConcurrentUnsealedFetches, atomic.LoadInt32(&rpn.calls)) // throttled require.EqualValues(t, MaxConcurrentStorageCalls, atomic.LoadInt32(&rpn.calls)) // throttled
// allow to proceed. // allow to proceed.
rpn.lk.Unlock() rpn.lk.Unlock()

View File

@ -27,11 +27,12 @@ var log = logging.Logger("dagstore-wrapper")
// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store. // MarketDAGStoreConfig is the config the market needs to then construct a DAG Store.
type MarketDAGStoreConfig struct { type MarketDAGStoreConfig struct {
TransientsDir string TransientsDir string
IndexDir string IndexDir string
Datastore ds.Datastore Datastore ds.Datastore
MaxConcurrentIndex int MaxConcurrentIndex int
GCInterval time.Duration MaxConcurrentCopies int
GCInterval time.Duration
} }
// DAGStore provides an interface for the DAG store that can be mocked out // DAGStore provides an interface for the DAG store that can be mocked out
@ -89,8 +90,9 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
TraceCh: traceCh, TraceCh: traceCh,
// not limiting fetches globally, as the Lotus mount does // not limiting fetches globally, as the Lotus mount does
// conditional throttling. // conditional throttling.
MaxConcurrentIndex: cfg.MaxConcurrentIndex, MaxConcurrentIndex: cfg.MaxConcurrentIndex,
RecoverOnStart: dagstore.RecoverOnAcquire, MaxConcurrentCopies: cfg.MaxConcurrentCopies,
RecoverOnStart: dagstore.RecoverOnAcquire,
} }
dagStore, err := dagstore.NewDAGStore(dcfg) dagStore, err := dagstore.NewDAGStore(dcfg)
if err != nil { if err != nil {

View File

@ -609,11 +609,12 @@ func DagStoreWrapper(
} }
cfg := dagstore.MarketDAGStoreConfig{ cfg := dagstore.MarketDAGStoreConfig{
TransientsDir: filepath.Join(dagStoreDir, "transients"), TransientsDir: filepath.Join(dagStoreDir, "transients"),
IndexDir: filepath.Join(dagStoreDir, "index"), IndexDir: filepath.Join(dagStoreDir, "index"),
Datastore: dagStoreDS, Datastore: dagStoreDS,
GCInterval: 1 * time.Minute, GCInterval: 1 * time.Minute,
MaxConcurrentIndex: 5, MaxConcurrentIndex: 5,
MaxConcurrentCopies: 2,
} }
dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor) dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor)