Make batch deal input test less flaky

This commit is contained in:
Łukasz Magiera 2021-05-30 15:13:38 +02:00
parent c8d603557b
commit 9475079b97
7 changed files with 68 additions and 16 deletions

View File

@ -8,6 +8,7 @@ import (
"math/rand"
"os"
"path/filepath"
"sort"
"testing"
"time"
@ -63,7 +64,7 @@ func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode,
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal, false)
waitDealSealed(t, ctx, miner, client, deal, false, false, nil)
// Retrieval
info, err := client.ClientGetDealInfo(ctx, *deal)
@ -207,10 +208,11 @@ func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, sta
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 1,
MaxWaitDealsSectors: 2,
MaxSealingSectors: 1,
MaxSealingSectorsForDeals: 2,
MaxSealingSectorsForDeals: 3,
AlwaysKeepUnsealedCopy: true,
WaitDealsDelay: time.Hour,
}, nil
}, nil
}),
@ -225,18 +227,40 @@ func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, sta
s := connectAndStartMining(t, b, blocktime, client, miner)
defer s.blockMiner.Stop()
checkNoPadding := func() {
sl, err := sn[0].SectorsList(s.ctx)
require.NoError(t, err)
sort.Slice(sl, func(i, j int) bool {
return sl[i] < sl[j]
})
for _, snum := range sl {
si, err := sn[0].SectorsStatus(s.ctx, snum, false)
require.NoError(t, err)
fmt.Printf("S %d: %+v %s\n", snum, si.Deals, si.State)
for _, deal := range si.Deals {
if deal == 0 {
fmt.Printf("sector %d had a padding piece!\n", snum)
}
}
}
}
// Starts a deal and waits until it's published
runDealTillSeal := func(rseed int) {
res, _, err := CreateClientFile(s.ctx, s.client, rseed, piece)
require.NoError(t, err)
dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)
waitDealSealed(t, s.ctx, s.miner, s.client, dc, false)
waitDealSealed(t, s.ctx, s.miner, s.client, dc, false, true, checkNoPadding)
}
// Run maxDealsPerMsg+1 deals in parallel
done := make(chan struct{}, maxDealsPerMsg+1)
for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ {
// Run maxDealsPerMsg deals in parallel
done := make(chan struct{}, maxDealsPerMsg)
for rseed := 0; rseed < int(maxDealsPerMsg); rseed++ {
rseed := rseed
go func() {
runDealTillSeal(rseed)
@ -249,10 +273,12 @@ func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, sta
<-done
}
checkNoPadding()
sl, err := sn[0].SectorsList(s.ctx)
require.NoError(t, err)
require.GreaterOrEqual(t, len(sl), expectSectors)
require.LessOrEqual(t, len(sl), expectSectors+1)
require.Equal(t, len(sl), expectSectors)
//require.LessOrEqual(t, len(sl), expectSectors+1)
}
}
@ -314,12 +340,12 @@ func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
waitDealSealed(t, s.ctx, s.miner, s.client, deal1, true)
waitDealSealed(t, s.ctx, s.miner, s.client, deal1, true, false, nil)
deal2 := startDeal(t, s.ctx, s.miner, s.client, fcid2, true, 0)
time.Sleep(time.Second)
waitDealSealed(t, s.ctx, s.miner, s.client, deal2, false)
waitDealSealed(t, s.ctx, s.miner, s.client, deal2, false, false, nil)
// Retrieval
info, err := s.client.ClientGetDealInfo(s.ctx, *deal2)
@ -375,7 +401,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client
return deal
}
func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, deal *cid.Cid, noseal bool) {
func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, deal *cid.Cid, noseal, noSealStart bool, cb func()) {
loop:
for {
di, err := client.ClientGetDealInfo(ctx, *deal)
@ -387,7 +413,9 @@ loop:
if noseal {
return
}
startSealingWaiting(t, ctx, miner)
if !noSealStart {
startSealingWaiting(t, ctx, miner)
}
case storagemarket.StorageDealProposalRejected:
t.Fatal("deal rejected")
case storagemarket.StorageDealFailing:
@ -398,8 +426,25 @@ loop:
fmt.Println("COMPLETE", di)
break loop
}
fmt.Println("Deal state: ", storagemarket.DealStates[di.State])
mds, err := miner.MarketListIncompleteDeals(ctx)
if err != nil {
t.Fatal(err)
}
var minerState storagemarket.StorageDealStatus
for _, md := range mds {
if md.DealID == di.DealID {
minerState = md.State
break
}
}
fmt.Printf("Deal %d state: client:%s provider:%s\n", di.DealID, storagemarket.DealStates[di.State], storagemarket.DealStates[minerState])
time.Sleep(time.Second / 2)
if cb != nil {
cb()
}
}
}

View File

@ -194,7 +194,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
waitDealSealed(t, ctx, provider, client, deal, false)
waitDealSealed(t, ctx, provider, client, deal, false, false, nil)
<-minedTwo

View File

@ -194,6 +194,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
func (m *Sealing) logEvents(events []statemachine.Event, state *SectorInfo) {
for _, event := range events {
log.Debugw("sector event", "sector", state.SectorNumber, "type", fmt.Sprintf("%T", event.User), "event", event.User)
e, err := json.Marshal(event)
if err != nil {
log.Errorf("marshaling event for logging: %+v", err)

View File

@ -436,6 +436,7 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi
}
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user")
return m.sectors.Send(uint64(sid), SectorStartPacking{})
}

View File

@ -35,7 +35,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
}
// todo: return to the sealing queue (this is extremely unlikely to happen)
pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector entered packing state early"))
pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector %d entered packing state early", sector.SectorNumber))
}
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))

View File

@ -103,6 +103,8 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context,
}
}
log.Infow("sub precommit", "deal", dealInfo.DealID)
// Not yet active, start matching against incoming messages
return false, true, nil
}
@ -154,6 +156,7 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context,
for _, did := range params.DealIDs {
if did == res.DealID {
// Found the deal ID in this message. Callback with the sector ID.
log.Infow("deal precommit found", "deal", res.DealID, "sector", params.SectorNumber)
cb(params.SectorNumber, false, nil)
return false, nil
}

View File

@ -66,6 +66,7 @@ func TestBatchDealInput(t *testing.T) {
logging.SetLogLevel("chain", "ERROR")
logging.SetLogLevel("sub", "ERROR")
logging.SetLogLevel("storageminer", "ERROR")
logging.SetLogLevel("sectors", "DEBUG")
blockTime := 10 * time.Millisecond