Test Miner SimultaneousTransfers
This commit is contained in:
parent
37c5dd5afc
commit
e9dd3e8650
@ -1,12 +1,22 @@
|
||||
package itests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
)
|
||||
|
||||
func TestDealCyclesConcurrent(t *testing.T) {
|
||||
@ -47,3 +57,71 @@ func TestDealCyclesConcurrent(t *testing.T) {
|
||||
t.Run(ns+"-stdretrieval-NoCAR", func(t *testing.T) { runTest(t, n, false, false) })
|
||||
}
|
||||
}
|
||||
|
||||
func TestSimultenousTransferLimit(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode")
|
||||
}
|
||||
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
blockTime := 10 * time.Millisecond
|
||||
|
||||
// For these tests where the block time is artificially short, just use
|
||||
// a deal start epoch that is guaranteed to be far enough in the future
|
||||
// so that the deal starts sealing in time
|
||||
startEpoch := abi.ChainEpoch(2 << 12)
|
||||
|
||||
runTest := func(t *testing.T) {
|
||||
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts(
|
||||
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(2))),
|
||||
))
|
||||
ens.InterconnectAll().BeginMining(blockTime)
|
||||
dh := kit.NewDealHarness(t, client, miner)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
du, err := miner.MarketDataTransferUpdates(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
var maxOngoing int
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
ongoing := map[datatransfer.TransferID]struct{}{}
|
||||
|
||||
for {
|
||||
select {
|
||||
case u := <-du:
|
||||
t.Logf("%d - %s", u.TransferID, datatransfer.Statuses[u.Status])
|
||||
if u.Status == datatransfer.Ongoing {
|
||||
ongoing[u.TransferID] = struct{}{}
|
||||
} else {
|
||||
delete(ongoing, u.TransferID)
|
||||
}
|
||||
|
||||
if len(ongoing) > maxOngoing {
|
||||
maxOngoing = len(ongoing)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
dh.RunConcurrentDeals(kit.RunConcurrentDealsOpts{
|
||||
N: 1, // TODO: set to 20 after https://github.com/ipfs/go-graphsync/issues/175 is fixed
|
||||
FastRetrieval: true,
|
||||
StartEpoch: startEpoch,
|
||||
})
|
||||
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
require.LessOrEqual(t, maxOngoing, 2)
|
||||
}
|
||||
|
||||
runTest(t)
|
||||
}
|
||||
|
@ -238,7 +238,7 @@ var LibP2P = Options(
|
||||
Override(ConnGaterKey, lp2p.ConnGaterOption),
|
||||
)
|
||||
|
||||
func isType(t repo.RepoType) func(s *Settings) bool {
|
||||
func IsType(t repo.RepoType) func(s *Settings) bool {
|
||||
return func(s *Settings) bool { return s.nodeType == t }
|
||||
}
|
||||
|
||||
@ -468,7 +468,7 @@ func Online() Option {
|
||||
LibP2P,
|
||||
|
||||
ApplyIf(isFullOrLiteNode, ChainNode),
|
||||
ApplyIf(isType(repo.StorageMiner), MinerNode),
|
||||
ApplyIf(IsType(repo.StorageMiner), MinerNode),
|
||||
)
|
||||
}
|
||||
|
||||
@ -680,8 +680,8 @@ func Repo(r repo.Repo) Option {
|
||||
|
||||
Override(new(*dtypes.APIAlg), modules.APISecret),
|
||||
|
||||
ApplyIf(isType(repo.FullNode), ConfigFullNode(c)),
|
||||
ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c)),
|
||||
ApplyIf(IsType(repo.FullNode), ConfigFullNode(c)),
|
||||
ApplyIf(IsType(repo.StorageMiner), ConfigStorageMiner(c)),
|
||||
)(settings)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user