Make batch deal input test less flaky
This commit is contained in:
parent
5c8498b603
commit
097baeb9b0
@ -8,6 +8,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -63,7 +64,7 @@ func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode,
|
|||||||
|
|
||||||
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
waitDealSealed(t, ctx, miner, client, deal, false)
|
waitDealSealed(t, ctx, miner, client, deal, false, false, nil)
|
||||||
|
|
||||||
// Retrieval
|
// Retrieval
|
||||||
info, err := client.ClientGetDealInfo(ctx, *deal)
|
info, err := client.ClientGetDealInfo(ctx, *deal)
|
||||||
@ -207,10 +208,11 @@ func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, sta
|
|||||||
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
|
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
|
||||||
return func() (sealiface.Config, error) {
|
return func() (sealiface.Config, error) {
|
||||||
return sealiface.Config{
|
return sealiface.Config{
|
||||||
MaxWaitDealsSectors: 1,
|
MaxWaitDealsSectors: 2,
|
||||||
MaxSealingSectors: 1,
|
MaxSealingSectors: 1,
|
||||||
MaxSealingSectorsForDeals: 2,
|
MaxSealingSectorsForDeals: 3,
|
||||||
AlwaysKeepUnsealedCopy: true,
|
AlwaysKeepUnsealedCopy: true,
|
||||||
|
WaitDealsDelay: time.Hour,
|
||||||
}, nil
|
}, nil
|
||||||
}, nil
|
}, nil
|
||||||
}),
|
}),
|
||||||
@ -225,18 +227,40 @@ func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, sta
|
|||||||
s := connectAndStartMining(t, b, blocktime, client, miner)
|
s := connectAndStartMining(t, b, blocktime, client, miner)
|
||||||
defer s.blockMiner.Stop()
|
defer s.blockMiner.Stop()
|
||||||
|
|
||||||
|
checkNoPadding := func() {
|
||||||
|
sl, err := sn[0].SectorsList(s.ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
sort.Slice(sl, func(i, j int) bool {
|
||||||
|
return sl[i] < sl[j]
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, snum := range sl {
|
||||||
|
si, err := sn[0].SectorsStatus(s.ctx, snum, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fmt.Printf("S %d: %+v %s\n", snum, si.Deals, si.State)
|
||||||
|
|
||||||
|
for _, deal := range si.Deals {
|
||||||
|
if deal == 0 {
|
||||||
|
fmt.Printf("sector %d had a padding piece!\n", snum)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Starts a deal and waits until it's published
|
// Starts a deal and waits until it's published
|
||||||
runDealTillSeal := func(rseed int) {
|
runDealTillSeal := func(rseed int) {
|
||||||
res, _, err := CreateClientFile(s.ctx, s.client, rseed, piece)
|
res, _, err := CreateClientFile(s.ctx, s.client, rseed, piece)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)
|
dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)
|
||||||
waitDealSealed(t, s.ctx, s.miner, s.client, dc, false)
|
waitDealSealed(t, s.ctx, s.miner, s.client, dc, false, true, checkNoPadding)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run maxDealsPerMsg+1 deals in parallel
|
// Run maxDealsPerMsg deals in parallel
|
||||||
done := make(chan struct{}, maxDealsPerMsg+1)
|
done := make(chan struct{}, maxDealsPerMsg)
|
||||||
for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ {
|
for rseed := 0; rseed < int(maxDealsPerMsg); rseed++ {
|
||||||
rseed := rseed
|
rseed := rseed
|
||||||
go func() {
|
go func() {
|
||||||
runDealTillSeal(rseed)
|
runDealTillSeal(rseed)
|
||||||
@ -249,10 +273,12 @@ func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, sta
|
|||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
checkNoPadding()
|
||||||
|
|
||||||
sl, err := sn[0].SectorsList(s.ctx)
|
sl, err := sn[0].SectorsList(s.ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.GreaterOrEqual(t, len(sl), expectSectors)
|
require.Equal(t, len(sl), expectSectors)
|
||||||
require.LessOrEqual(t, len(sl), expectSectors+1)
|
//require.LessOrEqual(t, len(sl), expectSectors+1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -314,12 +340,12 @@ func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration
|
|||||||
|
|
||||||
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
waitDealSealed(t, s.ctx, s.miner, s.client, deal1, true)
|
waitDealSealed(t, s.ctx, s.miner, s.client, deal1, true, false, nil)
|
||||||
|
|
||||||
deal2 := startDeal(t, s.ctx, s.miner, s.client, fcid2, true, 0)
|
deal2 := startDeal(t, s.ctx, s.miner, s.client, fcid2, true, 0)
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
waitDealSealed(t, s.ctx, s.miner, s.client, deal2, false)
|
waitDealSealed(t, s.ctx, s.miner, s.client, deal2, false, false, nil)
|
||||||
|
|
||||||
// Retrieval
|
// Retrieval
|
||||||
info, err := s.client.ClientGetDealInfo(s.ctx, *deal2)
|
info, err := s.client.ClientGetDealInfo(s.ctx, *deal2)
|
||||||
@ -375,7 +401,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client
|
|||||||
return deal
|
return deal
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, deal *cid.Cid, noseal bool) {
|
func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, deal *cid.Cid, noseal, noSealStart bool, cb func()) {
|
||||||
loop:
|
loop:
|
||||||
for {
|
for {
|
||||||
di, err := client.ClientGetDealInfo(ctx, *deal)
|
di, err := client.ClientGetDealInfo(ctx, *deal)
|
||||||
@ -387,7 +413,9 @@ loop:
|
|||||||
if noseal {
|
if noseal {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
startSealingWaiting(t, ctx, miner)
|
if !noSealStart {
|
||||||
|
startSealingWaiting(t, ctx, miner)
|
||||||
|
}
|
||||||
case storagemarket.StorageDealProposalRejected:
|
case storagemarket.StorageDealProposalRejected:
|
||||||
t.Fatal("deal rejected")
|
t.Fatal("deal rejected")
|
||||||
case storagemarket.StorageDealFailing:
|
case storagemarket.StorageDealFailing:
|
||||||
@ -398,8 +426,25 @@ loop:
|
|||||||
fmt.Println("COMPLETE", di)
|
fmt.Println("COMPLETE", di)
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
fmt.Println("Deal state: ", storagemarket.DealStates[di.State])
|
|
||||||
|
mds, err := miner.MarketListIncompleteDeals(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var minerState storagemarket.StorageDealStatus
|
||||||
|
for _, md := range mds {
|
||||||
|
if md.DealID == di.DealID {
|
||||||
|
minerState = md.State
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Deal %d state: client:%s provider:%s\n", di.DealID, storagemarket.DealStates[di.State], storagemarket.DealStates[minerState])
|
||||||
time.Sleep(time.Second / 2)
|
time.Sleep(time.Second / 2)
|
||||||
|
if cb != nil {
|
||||||
|
cb()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -194,7 +194,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
|
|||||||
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
waitDealSealed(t, ctx, provider, client, deal, false)
|
waitDealSealed(t, ctx, provider, client, deal, false, false, nil)
|
||||||
|
|
||||||
<-minedTwo
|
<-minedTwo
|
||||||
|
|
||||||
|
2
extern/storage-sealing/fsm.go
vendored
2
extern/storage-sealing/fsm.go
vendored
@ -225,6 +225,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
|||||||
|
|
||||||
func (m *Sealing) logEvents(events []statemachine.Event, state *SectorInfo) {
|
func (m *Sealing) logEvents(events []statemachine.Event, state *SectorInfo) {
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
|
log.Debugw("sector event", "sector", state.SectorNumber, "type", fmt.Sprintf("%T", event.User), "event", event.User)
|
||||||
|
|
||||||
e, err := json.Marshal(event)
|
e, err := json.Marshal(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("marshaling event for logging: %+v", err)
|
log.Errorf("marshaling event for logging: %+v", err)
|
||||||
|
1
extern/storage-sealing/input.go
vendored
1
extern/storage-sealing/input.go
vendored
@ -440,6 +440,7 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi
|
|||||||
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
|
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
|
||||||
m.startupWait.Wait()
|
m.startupWait.Wait()
|
||||||
|
|
||||||
|
log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user")
|
||||||
return m.sectors.Send(uint64(sid), SectorStartPacking{})
|
return m.sectors.Send(uint64(sid), SectorStartPacking{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
2
extern/storage-sealing/states_sealing.go
vendored
2
extern/storage-sealing/states_sealing.go
vendored
@ -37,7 +37,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
|
|||||||
}
|
}
|
||||||
|
|
||||||
// todo: return to the sealing queue (this is extremely unlikely to happen)
|
// todo: return to the sealing queue (this is extremely unlikely to happen)
|
||||||
pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector entered packing state early"))
|
pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector %d entered packing state early", sector.SectorNumber))
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
|
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
|
||||||
|
@ -104,6 +104,8 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Infow("sub precommit", "deal", dealInfo.DealID)
|
||||||
|
|
||||||
// Not yet active, start matching against incoming messages
|
// Not yet active, start matching against incoming messages
|
||||||
return false, true, nil
|
return false, true, nil
|
||||||
}
|
}
|
||||||
|
@ -63,6 +63,24 @@ func TestAPIDealFlow(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBatchDealInput(t *testing.T) {
|
||||||
|
logging.SetLogLevel("miner", "ERROR")
|
||||||
|
logging.SetLogLevel("chainstore", "ERROR")
|
||||||
|
logging.SetLogLevel("chain", "ERROR")
|
||||||
|
logging.SetLogLevel("sub", "ERROR")
|
||||||
|
logging.SetLogLevel("storageminer", "ERROR")
|
||||||
|
logging.SetLogLevel("sectors", "DEBUG")
|
||||||
|
|
||||||
|
blockTime := 10 * time.Millisecond
|
||||||
|
|
||||||
|
// For these tests where the block time is artificially short, just use
|
||||||
|
// a deal start epoch that is guaranteed to be far enough in the future
|
||||||
|
// so that the deal starts sealing in time
|
||||||
|
dealStartEpoch := abi.ChainEpoch(2 << 12)
|
||||||
|
|
||||||
|
test.TestBatchDealInput(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
|
||||||
|
}
|
||||||
|
|
||||||
func TestAPIDealFlowReal(t *testing.T) {
|
func TestAPIDealFlowReal(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("skipping test in short mode")
|
t.Skip("skipping test in short mode")
|
||||||
|
Loading…
Reference in New Issue
Block a user