storagefsm: Fix batch deal packing behavior

This commit is contained in:
Łukasz Magiera 2021-04-14 20:26:07 +02:00
parent 3ea39f76e1
commit c8d603557b
5 changed files with 106 additions and 77 deletions

View File

@ -51,7 +51,7 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, sta
} }
func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool, startEpoch abi.ChainEpoch) { func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool, startEpoch abi.ChainEpoch) {
res, data, err := CreateClientFile(ctx, client, rseed) res, data, err := CreateClientFile(ctx, client, rseed, 0)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -72,8 +72,11 @@ func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode,
testRetrieval(t, ctx, client, fcid, &info.PieceCID, carExport, data) testRetrieval(t, ctx, client, fcid, &info.PieceCID, carExport, data)
} }
func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api.ImportRes, []byte, error) { func CreateClientFile(ctx context.Context, client api.FullNode, rseed, size int) (*api.ImportRes, []byte, error) {
data := make([]byte, 1600) if size == 0 {
size = 1600
}
data := make([]byte, size)
rand.New(rand.NewSource(int64(rseed))).Read(data) rand.New(rand.NewSource(int64(rseed))).Read(data)
dir, err := ioutil.TempDir(os.TempDir(), "test-make-deal-") dir, err := ioutil.TempDir(os.TempDir(), "test-make-deal-")
@ -119,7 +122,7 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio
// Starts a deal and waits until it's published // Starts a deal and waits until it's published
runDealTillPublish := func(rseed int) { runDealTillPublish := func(rseed int) {
res, _, err := CreateClientFile(s.ctx, s.client, rseed) res, _, err := CreateClientFile(s.ctx, s.client, rseed, 0)
require.NoError(t, err) require.NoError(t, err)
upds, err := client.ClientGetDealUpdates(s.ctx) upds, err := client.ClientGetDealUpdates(s.ctx)
@ -186,68 +189,76 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio
} }
func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
publishPeriod := 10 * time.Second run := func(piece, deals, expectSectors int) func(t *testing.T) {
maxDealsPerMsg := uint64(4) return func(t *testing.T) {
publishPeriod := 10 * time.Second
maxDealsPerMsg := uint64(deals)
// Set max deals per publish deals message to maxDealsPerMsg // Set max deals per publish deals message to maxDealsPerMsg
minerDef := []StorageMiner{{ minerDef := []StorageMiner{{
Full: 0, Full: 0,
Opts: node.Options( Opts: node.Options(
node.Override( node.Override(
new(*storageadapter.DealPublisher), new(*storageadapter.DealPublisher),
storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{ storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{
Period: publishPeriod, Period: publishPeriod,
MaxDealsPerMsg: maxDealsPerMsg, MaxDealsPerMsg: maxDealsPerMsg,
})), })),
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) { return func() (sealiface.Config, error) {
return sealiface.Config{ return sealiface.Config{
MaxWaitDealsSectors: 1, MaxWaitDealsSectors: 1,
MaxSealingSectors: 1, MaxSealingSectors: 1,
MaxSealingSectorsForDeals: 2, MaxSealingSectorsForDeals: 2,
AlwaysKeepUnsealedCopy: true, AlwaysKeepUnsealedCopy: true,
}, nil }, nil
}, nil }, nil
}), }),
), ),
Preseal: PresealGenesis, Preseal: PresealGenesis,
}} }}
// Create a connect client and miner node // Create a connect client and miner node
n, sn := b(t, OneFull, minerDef) n, sn := b(t, OneFull, minerDef)
client := n[0].FullNode.(*impl.FullNodeAPI) client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0] miner := sn[0]
s := connectAndStartMining(t, b, blocktime, client, miner) s := connectAndStartMining(t, b, blocktime, client, miner)
defer s.blockMiner.Stop() defer s.blockMiner.Stop()
// Starts a deal and waits until it's published // Starts a deal and waits until it's published
runDealTillSeal := func(rseed int) { runDealTillSeal := func(rseed int) {
res, _, err := CreateClientFile(s.ctx, s.client, rseed) res, _, err := CreateClientFile(s.ctx, s.client, rseed, piece)
require.NoError(t, err) require.NoError(t, err)
dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch) dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)
waitDealSealed(t, s.ctx, s.miner, s.client, dc, false) waitDealSealed(t, s.ctx, s.miner, s.client, dc, false)
}
// Run maxDealsPerMsg+1 deals in parallel
done := make(chan struct{}, maxDealsPerMsg+1)
for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ {
rseed := rseed
go func() {
runDealTillSeal(rseed)
done <- struct{}{}
}()
}
// Wait for maxDealsPerMsg of the deals to be published
for i := 0; i < int(maxDealsPerMsg); i++ {
<-done
}
sl, err := sn[0].SectorsList(s.ctx)
require.NoError(t, err)
require.GreaterOrEqual(t, len(sl), expectSectors)
require.LessOrEqual(t, len(sl), expectSectors+1)
}
} }
// Run maxDealsPerMsg+1 deals in parallel t.Run("4-p1600B", run(1600, 4, 4))
done := make(chan struct{}, maxDealsPerMsg+1) t.Run("4-p513B", run(513, 4, 2))
for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ { t.Run("32-p257B", run(257, 32, 8))
rseed := rseed
go func() {
runDealTillSeal(rseed)
done <- struct{}{}
}()
}
// Wait for maxDealsPerMsg of the deals to be published
for i := 0; i < int(maxDealsPerMsg); i++ {
<-done
}
sl, err := sn[0].SectorsList(s.ctx)
require.NoError(t, err)
require.GreaterOrEqual(t, len(sl), 4)
require.LessOrEqual(t, len(sl), 5)
} }
func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
@ -430,7 +441,7 @@ func startSealingWaiting(t *testing.T, ctx context.Context, miner TestStorageNod
si, err := miner.SectorsStatus(ctx, snum, false) si, err := miner.SectorsStatus(ctx, snum, false)
require.NoError(t, err) require.NoError(t, err)
t.Logf("Sector state: %s", si.State) t.Logf("Sector %d state: %s", snum, si.State)
if si.State == api.SectorState(sealing.WaitDeals) { if si.State == api.SectorState(sealing.WaitDeals) {
require.NoError(t, miner.SectorStartSealing(ctx, snum)) require.NoError(t, miner.SectorStartSealing(ctx, snum))
} }

View File

@ -44,7 +44,7 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode)
// Create a deal (non-interactive) // Create a deal (non-interactive)
// client deal --start-epoch=<start epoch> <cid> <miner addr> 1000000attofil <duration> // client deal --start-epoch=<start epoch> <cid> <miner addr> 1000000attofil <duration>
res, _, err := test.CreateClientFile(ctx, clientNode, 1) res, _, err := test.CreateClientFile(ctx, clientNode, 1, 0)
require.NoError(t, err) require.NoError(t, err)
startEpoch := fmt.Sprintf("--start-epoch=%d", 2<<12) startEpoch := fmt.Sprintf("--start-epoch=%d", 2<<12)
dataCid := res.Root dataCid := res.Root
@ -60,7 +60,7 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode)
// <miner addr> // <miner addr>
// "no" (verified client) // "no" (verified client)
// "yes" (confirm deal) // "yes" (confirm deal)
res, _, err = test.CreateClientFile(ctx, clientNode, 2) res, _, err = test.CreateClientFile(ctx, clientNode, 2, 0)
require.NoError(t, err) require.NoError(t, err)
dataCid2 := res.Root dataCid2 := res.Root
duration = fmt.Sprintf("%d", build.MinDealDuration/builtin.EpochsInDay) duration = fmt.Sprintf("%d", build.MinDealDuration/builtin.EpochsInDay)

View File

@ -51,6 +51,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
AddPiece: planOne( AddPiece: planOne(
on(SectorPieceAdded{}, WaitDeals), on(SectorPieceAdded{}, WaitDeals),
apply(SectorStartPacking{}), apply(SectorStartPacking{}),
apply(SectorAddPiece{}),
on(SectorAddPieceFailed{}, AddPieceFailed), on(SectorAddPieceFailed{}, AddPieceFailed),
), ),
Packing: planOne(on(SectorPacked{}, GetTicket)), Packing: planOne(on(SectorPacked{}, GetTicket)),
@ -534,6 +535,7 @@ func onReturning(mut mutator) func() (mutator, func(*SectorInfo) (bool, error))
func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) { func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) { return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
eloop:
for i, event := range events { for i, event := range events {
if gm, ok := event.User.(globalMutator); ok { if gm, ok := event.User.(globalMutator); ok {
gm.applyGlobal(state) gm.applyGlobal(state)
@ -556,6 +558,8 @@ func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err e
if err != nil || !more { if err != nil || !more {
return uint64(i + 1), err return uint64(i + 1), err
} }
continue eloop
} }
_, ok := event.User.(Ignorable) _, ok := event.User.(Ignorable)

View File

@ -27,6 +27,14 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
m.inputLk.Lock() m.inputLk.Lock()
sid := m.minerSectorID(sector.SectorNumber)
if len(m.assignedPieces[sid]) > 0 {
m.inputLk.Unlock()
// got assigned more pieces in the AddPiece state
return ctx.Send(SectorAddPiece{})
}
started, err := m.maybeStartSealing(ctx, sector, used) started, err := m.maybeStartSealing(ctx, sector, used)
if err != nil || started { if err != nil || started {
delete(m.openSectors, m.minerSectorID(sector.SectorNumber)) delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
@ -36,16 +44,16 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
return err return err
} }
m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ if _, has := m.openSectors[sid]; !has {
used: used, m.openSectors[sid] = &openSector{
maybeAccept: func(cid cid.Cid) error { used: used,
// todo check deal start deadline (configurable) maybeAccept: func(cid cid.Cid) error {
// todo check deal start deadline (configurable)
m.assignedPieces[sid] = append(m.assignedPieces[sid], cid)
sid := m.minerSectorID(sector.SectorNumber) return ctx.Send(SectorAddPiece{})
m.assignedPieces[sid] = append(m.assignedPieces[sid], cid) },
}
return ctx.Send(SectorAddPiece{})
},
} }
go func() { go func() {
@ -350,11 +358,19 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
continue continue
} }
avail := abi.PaddedPieceSize(ssize).Unpadded() - m.openSectors[mt.sector].used
if mt.size > avail {
continue
}
err := m.openSectors[mt.sector].maybeAccept(mt.deal) err := m.openSectors[mt.sector].maybeAccept(mt.deal)
if err != nil { if err != nil {
m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece
} }
m.openSectors[mt.sector].used += mt.padding + mt.size
m.pendingPieces[mt.deal].assigned = true m.pendingPieces[mt.deal].assigned = true
delete(toAssign, mt.deal) delete(toAssign, mt.deal)
@ -362,8 +378,6 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err) log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err)
continue continue
} }
delete(m.openSectors, mt.sector)
} }
if len(toAssign) > 0 { if len(toAssign) > 0 {

View File

@ -244,13 +244,13 @@ func (sm *StorageMinerAPI) SectorsList(context.Context) ([]abi.SectorNumber, err
return nil, err return nil, err
} }
out := make([]abi.SectorNumber, len(sectors)) out := make([]abi.SectorNumber, 0, len(sectors))
for i, sector := range sectors { for _, sector := range sectors {
if sector.State == sealing.UndefinedSectorState { if sector.State == sealing.UndefinedSectorState {
continue // sector ID not set yet continue // sector ID not set yet
} }
out[i] = sector.SectorNumber out = append(out, sector.SectorNumber)
} }
return out, nil return out, nil
} }