implement conditional throttling in lotus mount.
This commit is contained in:
parent
18f5997d02
commit
109a8242ae
2
go.mod
2
go.mod
@ -26,7 +26,7 @@ require (
|
|||||||
github.com/elastic/gosigar v0.12.0
|
github.com/elastic/gosigar v0.12.0
|
||||||
github.com/etclabscore/go-openrpc-reflect v0.0.36
|
github.com/etclabscore/go-openrpc-reflect v0.0.36
|
||||||
github.com/fatih/color v1.9.0
|
github.com/fatih/color v1.9.0
|
||||||
github.com/filecoin-project/dagstore v0.3.0
|
github.com/filecoin-project/dagstore v0.3.1-0.20210727155220-5db1798dc4c8
|
||||||
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
|
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
|
||||||
github.com/filecoin-project/go-address v0.0.5
|
github.com/filecoin-project/go-address v0.0.5
|
||||||
github.com/filecoin-project/go-bitfield v0.2.4
|
github.com/filecoin-project/go-bitfield v0.2.4
|
||||||
|
3
go.sum
3
go.sum
@ -256,8 +256,9 @@ github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGj
|
|||||||
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
|
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
|
||||||
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
|
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
|
||||||
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
|
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
|
||||||
github.com/filecoin-project/dagstore v0.3.0 h1:jYqycoe6WxK6CbARco17J4uqJr3pskC/8t6fqN/atbQ=
|
|
||||||
github.com/filecoin-project/dagstore v0.3.0/go.mod h1:N0DVt3djIIzUpvab9Ja5D3dLgBVftWwC6idgFG2tZRI=
|
github.com/filecoin-project/dagstore v0.3.0/go.mod h1:N0DVt3djIIzUpvab9Ja5D3dLgBVftWwC6idgFG2tZRI=
|
||||||
|
github.com/filecoin-project/dagstore v0.3.1-0.20210727155220-5db1798dc4c8 h1:uKrlFJ7k7PIfbAQPpuRHqFqjA/soAkfyPOHrkCg7tYw=
|
||||||
|
github.com/filecoin-project/dagstore v0.3.1-0.20210727155220-5db1798dc4c8/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY=
|
||||||
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
|
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
|
||||||
github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM=
|
github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM=
|
||||||
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
|
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/dagstore/throttle"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -12,6 +13,11 @@ import (
|
|||||||
"github.com/filecoin-project/go-fil-markets/shared"
|
"github.com/filecoin-project/go-fil-markets/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// MaxConcurrentUnsealedFetches caps the amount of concurrent fetches for
|
||||||
|
// unsealed pieces, so that we don't saturate IO or the network too much,
|
||||||
|
// especially when bulk processing (e.g. at migration).
|
||||||
|
var MaxConcurrentUnsealedFetches = 3
|
||||||
|
|
||||||
type LotusAccessor interface {
|
type LotusAccessor interface {
|
||||||
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
|
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
|
||||||
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
|
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
|
||||||
@ -21,8 +27,8 @@ type LotusAccessor interface {
|
|||||||
type lotusAccessor struct {
|
type lotusAccessor struct {
|
||||||
pieceStore piecestore.PieceStore
|
pieceStore piecestore.PieceStore
|
||||||
rm retrievalmarket.RetrievalProviderNode
|
rm retrievalmarket.RetrievalProviderNode
|
||||||
|
throttle throttle.Throttler
|
||||||
readyMgr *shared.ReadyManager
|
readyMgr *shared.ReadyManager
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ LotusAccessor = (*lotusAccessor)(nil)
|
var _ LotusAccessor = (*lotusAccessor)(nil)
|
||||||
@ -31,6 +37,7 @@ func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalP
|
|||||||
return &lotusAccessor{
|
return &lotusAccessor{
|
||||||
pieceStore: store,
|
pieceStore: store,
|
||||||
rm: rm,
|
rm: rm,
|
||||||
|
throttle: throttle.Fixed(MaxConcurrentUnsealedFetches),
|
||||||
readyMgr: shared.NewReadyManager(),
|
readyMgr: shared.NewReadyManager(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -84,11 +91,16 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if isUnsealed {
|
if isUnsealed {
|
||||||
// UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around.
|
var reader io.ReadCloser
|
||||||
reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
// We want to throttle this path, as these copies will be downloaded
|
||||||
if err == nil {
|
// immediately from the storage cluster without any unsealing
|
||||||
return reader, nil
|
// necessary.
|
||||||
}
|
err := m.throttle.Do(ctx, func(ctx context.Context) (err error) {
|
||||||
|
// UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around.
|
||||||
|
reader, err = m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return reader, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,12 +4,16 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
ds "github.com/ipfs/go-datastore"
|
ds "github.com/ipfs/go-datastore"
|
||||||
ds_sync "github.com/ipfs/go-datastore/sync"
|
ds_sync "github.com/ipfs/go-datastore/sync"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
@ -118,6 +122,57 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
|
|||||||
require.EqualValues(t, 10, len)
|
require.EqualValues(t, 10, len)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestThrottle(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
cid1, err := cid.Parse("bafkqaaa")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ps := getPieceStore(t)
|
||||||
|
rpn := &mockRPN{
|
||||||
|
sectors: map[abi.SectorNumber]string{
|
||||||
|
unsealedSectorID: "foo",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
api := NewLotusAccessor(ps, rpn)
|
||||||
|
require.NoError(t, api.Start(ctx))
|
||||||
|
|
||||||
|
// Add a deal with data Length 10
|
||||||
|
dealInfo := piecestore.DealInfo{
|
||||||
|
SectorID: unsealedSectorID,
|
||||||
|
Length: 10,
|
||||||
|
}
|
||||||
|
err = ps.AddDealForPiece(cid1, dealInfo)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// hold the lock to block.
|
||||||
|
rpn.lk.Lock()
|
||||||
|
|
||||||
|
// fetch the piece concurrently.
|
||||||
|
errgrp, ctx := errgroup.WithContext(context.Background())
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
errgrp.Go(func() error {
|
||||||
|
r, err := api.FetchUnsealedPiece(ctx, cid1)
|
||||||
|
if err == nil {
|
||||||
|
_ = r.Close()
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
require.EqualValues(t, MaxConcurrentUnsealedFetches, atomic.LoadInt32(&rpn.calls)) // throttled
|
||||||
|
|
||||||
|
// allow to proceed.
|
||||||
|
rpn.lk.Unlock()
|
||||||
|
|
||||||
|
// allow all to finish.
|
||||||
|
err = errgrp.Wait()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.EqualValues(t, 10, atomic.LoadInt32(&rpn.calls)) // throttled
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func getPieceStore(t *testing.T) piecestore.PieceStore {
|
func getPieceStore(t *testing.T) piecestore.PieceStore {
|
||||||
ps, err := piecestoreimpl.NewPieceStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
|
ps, err := piecestoreimpl.NewPieceStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -128,10 +183,16 @@ func getPieceStore(t *testing.T) piecestore.PieceStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type mockRPN struct {
|
type mockRPN struct {
|
||||||
|
calls int32 // guarded by atomic
|
||||||
|
lk sync.RWMutex // lock to simulate blocks.
|
||||||
sectors map[abi.SectorNumber]string
|
sectors map[abi.SectorNumber]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
|
||||||
|
atomic.AddInt32(&m.calls, 1)
|
||||||
|
m.lk.RLock()
|
||||||
|
defer m.lk.RUnlock()
|
||||||
|
|
||||||
data, ok := m.sectors[sectorID]
|
data, ok := m.sectors[sectorID]
|
||||||
if !ok {
|
if !ok {
|
||||||
panic("sector not found")
|
panic("sector not found")
|
||||||
|
@ -22,7 +22,9 @@ type LotusMount struct {
|
|||||||
PieceCid cid.Cid
|
PieceCid cid.Cid
|
||||||
}
|
}
|
||||||
|
|
||||||
// This method is called when registering a mount with the DAG store registry.
|
// NewLotusMountTemplate is called when registering a mount with
|
||||||
|
// the DAG store registry.
|
||||||
|
//
|
||||||
// The DAG store registry receives an instance of the mount (a "template").
|
// The DAG store registry receives an instance of the mount (a "template").
|
||||||
// When the registry needs to deserialize a mount it clones the template then
|
// When the registry needs to deserialize a mount it clones the template then
|
||||||
// calls Deserialize on the cloned instance, which will have a reference to the
|
// calls Deserialize on the cloned instance, which will have a reference to the
|
||||||
|
@ -82,13 +82,14 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
|
|||||||
}
|
}
|
||||||
|
|
||||||
dcfg := dagstore.Config{
|
dcfg := dagstore.Config{
|
||||||
TransientsDir: cfg.TransientsDir,
|
TransientsDir: cfg.TransientsDir,
|
||||||
IndexRepo: irepo,
|
IndexRepo: irepo,
|
||||||
Datastore: cfg.Datastore,
|
Datastore: cfg.Datastore,
|
||||||
MountRegistry: registry,
|
MountRegistry: registry,
|
||||||
FailureCh: failureCh,
|
FailureCh: failureCh,
|
||||||
TraceCh: traceCh,
|
TraceCh: traceCh,
|
||||||
MaxConcurrentFetch: cfg.MaxConcurrentFetch,
|
// not limiting fetches globally, as the Lotus mount does
|
||||||
|
// conditional throttling.
|
||||||
MaxConcurrentIndex: cfg.MaxConcurrentIndex,
|
MaxConcurrentIndex: cfg.MaxConcurrentIndex,
|
||||||
RecoverOnStart: dagstore.RecoverOnAcquire,
|
RecoverOnStart: dagstore.RecoverOnAcquire,
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user