Merge pull request #6519 from filecoin-project/jen/bakcport-6041
Backport #6041 - storagefsm: Fix batch deal packing behavior
This commit is contained in:
commit
00372e144d
@ -8,6 +8,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -18,6 +19,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/go-state-types/big"
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||||
@ -51,7 +53,7 @@ func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, sta
|
|||||||
}
|
}
|
||||||
|
|
||||||
func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool, startEpoch abi.ChainEpoch) {
|
func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool, startEpoch abi.ChainEpoch) {
|
||||||
res, data, err := CreateClientFile(ctx, client, rseed)
|
res, data, err := CreateClientFile(ctx, client, rseed, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -63,7 +65,7 @@ func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode,
|
|||||||
|
|
||||||
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
waitDealSealed(t, ctx, miner, client, deal, false)
|
waitDealSealed(t, ctx, miner, client, deal, false, false, nil)
|
||||||
|
|
||||||
// Retrieval
|
// Retrieval
|
||||||
info, err := client.ClientGetDealInfo(ctx, *deal)
|
info, err := client.ClientGetDealInfo(ctx, *deal)
|
||||||
@ -72,8 +74,11 @@ func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode,
|
|||||||
testRetrieval(t, ctx, client, fcid, &info.PieceCID, carExport, data)
|
testRetrieval(t, ctx, client, fcid, &info.PieceCID, carExport, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api.ImportRes, []byte, error) {
|
func CreateClientFile(ctx context.Context, client api.FullNode, rseed, size int) (*api.ImportRes, []byte, error) {
|
||||||
data := make([]byte, 1600)
|
if size == 0 {
|
||||||
|
size = 1600
|
||||||
|
}
|
||||||
|
data := make([]byte, size)
|
||||||
rand.New(rand.NewSource(int64(rseed))).Read(data)
|
rand.New(rand.NewSource(int64(rseed))).Read(data)
|
||||||
|
|
||||||
dir, err := ioutil.TempDir(os.TempDir(), "test-make-deal-")
|
dir, err := ioutil.TempDir(os.TempDir(), "test-make-deal-")
|
||||||
@ -119,7 +124,7 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio
|
|||||||
|
|
||||||
// Starts a deal and waits until it's published
|
// Starts a deal and waits until it's published
|
||||||
runDealTillPublish := func(rseed int) {
|
runDealTillPublish := func(rseed int) {
|
||||||
res, _, err := CreateClientFile(s.ctx, s.client, rseed)
|
res, _, err := CreateClientFile(s.ctx, s.client, rseed, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
upds, err := client.ClientGetDealUpdates(s.ctx)
|
upds, err := client.ClientGetDealUpdates(s.ctx)
|
||||||
@ -186,68 +191,109 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
|
func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
|
||||||
publishPeriod := 10 * time.Second
|
run := func(piece, deals, expectSectors int) func(t *testing.T) {
|
||||||
maxDealsPerMsg := uint64(4)
|
return func(t *testing.T) {
|
||||||
|
publishPeriod := 10 * time.Second
|
||||||
|
maxDealsPerMsg := uint64(deals)
|
||||||
|
|
||||||
// Set max deals per publish deals message to maxDealsPerMsg
|
// Set max deals per publish deals message to maxDealsPerMsg
|
||||||
minerDef := []StorageMiner{{
|
minerDef := []StorageMiner{{
|
||||||
Full: 0,
|
Full: 0,
|
||||||
Opts: node.Options(
|
Opts: node.Options(
|
||||||
node.Override(
|
node.Override(
|
||||||
new(*storageadapter.DealPublisher),
|
new(*storageadapter.DealPublisher),
|
||||||
storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{
|
storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{
|
||||||
Period: publishPeriod,
|
Period: publishPeriod,
|
||||||
MaxDealsPerMsg: maxDealsPerMsg,
|
MaxDealsPerMsg: maxDealsPerMsg,
|
||||||
})),
|
})),
|
||||||
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
|
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
|
||||||
return func() (sealiface.Config, error) {
|
return func() (sealiface.Config, error) {
|
||||||
return sealiface.Config{
|
return sealiface.Config{
|
||||||
MaxWaitDealsSectors: 1,
|
MaxWaitDealsSectors: 2,
|
||||||
MaxSealingSectors: 1,
|
MaxSealingSectors: 1,
|
||||||
MaxSealingSectorsForDeals: 2,
|
MaxSealingSectorsForDeals: 3,
|
||||||
AlwaysKeepUnsealedCopy: true,
|
AlwaysKeepUnsealedCopy: true,
|
||||||
}, nil
|
WaitDealsDelay: time.Hour,
|
||||||
}, nil
|
}, nil
|
||||||
}),
|
}, nil
|
||||||
),
|
}),
|
||||||
Preseal: PresealGenesis,
|
),
|
||||||
}}
|
Preseal: PresealGenesis,
|
||||||
|
}}
|
||||||
|
|
||||||
// Create a connect client and miner node
|
// Create a connect client and miner node
|
||||||
n, sn := b(t, OneFull, minerDef)
|
n, sn := b(t, OneFull, minerDef)
|
||||||
client := n[0].FullNode.(*impl.FullNodeAPI)
|
client := n[0].FullNode.(*impl.FullNodeAPI)
|
||||||
miner := sn[0]
|
miner := sn[0]
|
||||||
s := connectAndStartMining(t, b, blocktime, client, miner)
|
s := connectAndStartMining(t, b, blocktime, client, miner)
|
||||||
defer s.blockMiner.Stop()
|
defer s.blockMiner.Stop()
|
||||||
|
|
||||||
// Starts a deal and waits until it's published
|
err := miner.MarketSetAsk(s.ctx, big.Zero(), big.Zero(), 200, 128, 32<<30)
|
||||||
runDealTillSeal := func(rseed int) {
|
require.NoError(t, err)
|
||||||
res, _, err := CreateClientFile(s.ctx, s.client, rseed)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)
|
checkNoPadding := func() {
|
||||||
waitDealSealed(t, s.ctx, s.miner, s.client, dc, false)
|
sl, err := sn[0].SectorsList(s.ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
sort.Slice(sl, func(i, j int) bool {
|
||||||
|
return sl[i] < sl[j]
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, snum := range sl {
|
||||||
|
si, err := sn[0].SectorsStatus(s.ctx, snum, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// fmt.Printf("S %d: %+v %s\n", snum, si.Deals, si.State)
|
||||||
|
|
||||||
|
for _, deal := range si.Deals {
|
||||||
|
if deal == 0 {
|
||||||
|
fmt.Printf("sector %d had a padding piece!\n", snum)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Starts a deal and waits until it's published
|
||||||
|
runDealTillSeal := func(rseed int) {
|
||||||
|
res, _, err := CreateClientFile(s.ctx, s.client, rseed, piece)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)
|
||||||
|
waitDealSealed(t, s.ctx, s.miner, s.client, dc, false, true, checkNoPadding)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run maxDealsPerMsg deals in parallel
|
||||||
|
done := make(chan struct{}, maxDealsPerMsg)
|
||||||
|
for rseed := 0; rseed < int(maxDealsPerMsg); rseed++ {
|
||||||
|
rseed := rseed
|
||||||
|
go func() {
|
||||||
|
runDealTillSeal(rseed)
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for maxDealsPerMsg of the deals to be published
|
||||||
|
for i := 0; i < int(maxDealsPerMsg); i++ {
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
checkNoPadding()
|
||||||
|
|
||||||
|
sl, err := sn[0].SectorsList(s.ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, len(sl), expectSectors)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run maxDealsPerMsg+1 deals in parallel
|
t.Run("4-p1600B", run(1600, 4, 4))
|
||||||
done := make(chan struct{}, maxDealsPerMsg+1)
|
t.Run("4-p513B", run(513, 4, 2))
|
||||||
for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ {
|
if !testing.Short() {
|
||||||
rseed := rseed
|
t.Run("32-p257B", run(257, 32, 8))
|
||||||
go func() {
|
t.Run("32-p10B", run(10, 32, 2))
|
||||||
runDealTillSeal(rseed)
|
|
||||||
done <- struct{}{}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for maxDealsPerMsg of the deals to be published
|
// fixme: this appears to break data-transfer / markets in some really creative ways
|
||||||
for i := 0; i < int(maxDealsPerMsg); i++ {
|
//t.Run("128-p10B", run(10, 128, 8))
|
||||||
<-done
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sl, err := sn[0].SectorsList(s.ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.GreaterOrEqual(t, len(sl), 4)
|
|
||||||
require.LessOrEqual(t, len(sl), 5)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
|
func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
|
||||||
@ -303,12 +349,12 @@ func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration
|
|||||||
|
|
||||||
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
waitDealSealed(t, s.ctx, s.miner, s.client, deal1, true)
|
waitDealSealed(t, s.ctx, s.miner, s.client, deal1, true, false, nil)
|
||||||
|
|
||||||
deal2 := startDeal(t, s.ctx, s.miner, s.client, fcid2, true, 0)
|
deal2 := startDeal(t, s.ctx, s.miner, s.client, fcid2, true, 0)
|
||||||
|
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
waitDealSealed(t, s.ctx, s.miner, s.client, deal2, false)
|
waitDealSealed(t, s.ctx, s.miner, s.client, deal2, false, false, nil)
|
||||||
|
|
||||||
// Retrieval
|
// Retrieval
|
||||||
info, err := s.client.ClientGetDealInfo(s.ctx, *deal2)
|
info, err := s.client.ClientGetDealInfo(s.ctx, *deal2)
|
||||||
@ -364,7 +410,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client
|
|||||||
return deal
|
return deal
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, deal *cid.Cid, noseal bool) {
|
func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, deal *cid.Cid, noseal, noSealStart bool, cb func()) {
|
||||||
loop:
|
loop:
|
||||||
for {
|
for {
|
||||||
di, err := client.ClientGetDealInfo(ctx, *deal)
|
di, err := client.ClientGetDealInfo(ctx, *deal)
|
||||||
@ -376,7 +422,9 @@ loop:
|
|||||||
if noseal {
|
if noseal {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
startSealingWaiting(t, ctx, miner)
|
if !noSealStart {
|
||||||
|
startSealingWaiting(t, ctx, miner)
|
||||||
|
}
|
||||||
case storagemarket.StorageDealProposalRejected:
|
case storagemarket.StorageDealProposalRejected:
|
||||||
t.Fatal("deal rejected")
|
t.Fatal("deal rejected")
|
||||||
case storagemarket.StorageDealFailing:
|
case storagemarket.StorageDealFailing:
|
||||||
@ -387,8 +435,25 @@ loop:
|
|||||||
fmt.Println("COMPLETE", di)
|
fmt.Println("COMPLETE", di)
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
fmt.Println("Deal state: ", storagemarket.DealStates[di.State])
|
|
||||||
|
mds, err := miner.MarketListIncompleteDeals(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var minerState storagemarket.StorageDealStatus
|
||||||
|
for _, md := range mds {
|
||||||
|
if md.DealID == di.DealID {
|
||||||
|
minerState = md.State
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Deal %d state: client:%s provider:%s\n", di.DealID, storagemarket.DealStates[di.State], storagemarket.DealStates[minerState])
|
||||||
time.Sleep(time.Second / 2)
|
time.Sleep(time.Second / 2)
|
||||||
|
if cb != nil {
|
||||||
|
cb()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -430,7 +495,7 @@ func startSealingWaiting(t *testing.T, ctx context.Context, miner TestStorageNod
|
|||||||
si, err := miner.SectorsStatus(ctx, snum, false)
|
si, err := miner.SectorsStatus(ctx, snum, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
t.Logf("Sector state: %s", si.State)
|
t.Logf("Sector %d state: %s", snum, si.State)
|
||||||
if si.State == api.SectorState(sealing.WaitDeals) {
|
if si.State == api.SectorState(sealing.WaitDeals) {
|
||||||
require.NoError(t, miner.SectorStartSealing(ctx, snum))
|
require.NoError(t, miner.SectorStartSealing(ctx, snum))
|
||||||
}
|
}
|
||||||
|
@ -194,7 +194,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
|
|||||||
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
waitDealSealed(t, ctx, provider, client, deal, false)
|
waitDealSealed(t, ctx, provider, client, deal, false, false, nil)
|
||||||
|
|
||||||
<-minedTwo
|
<-minedTwo
|
||||||
|
|
||||||
|
@ -44,7 +44,7 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode)
|
|||||||
|
|
||||||
// Create a deal (non-interactive)
|
// Create a deal (non-interactive)
|
||||||
// client deal --start-epoch=<start epoch> <cid> <miner addr> 1000000attofil <duration>
|
// client deal --start-epoch=<start epoch> <cid> <miner addr> 1000000attofil <duration>
|
||||||
res, _, err := test.CreateClientFile(ctx, clientNode, 1)
|
res, _, err := test.CreateClientFile(ctx, clientNode, 1, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
startEpoch := fmt.Sprintf("--start-epoch=%d", 2<<12)
|
startEpoch := fmt.Sprintf("--start-epoch=%d", 2<<12)
|
||||||
dataCid := res.Root
|
dataCid := res.Root
|
||||||
@ -60,7 +60,7 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode test.TestNode)
|
|||||||
// <miner addr>
|
// <miner addr>
|
||||||
// "no" (verified client)
|
// "no" (verified client)
|
||||||
// "yes" (confirm deal)
|
// "yes" (confirm deal)
|
||||||
res, _, err = test.CreateClientFile(ctx, clientNode, 2)
|
res, _, err = test.CreateClientFile(ctx, clientNode, 2, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
dataCid2 := res.Root
|
dataCid2 := res.Root
|
||||||
duration = fmt.Sprintf("%d", build.MinDealDuration/builtin.EpochsInDay)
|
duration = fmt.Sprintf("%d", build.MinDealDuration/builtin.EpochsInDay)
|
||||||
|
10
extern/storage-sealing/fsm.go
vendored
10
extern/storage-sealing/fsm.go
vendored
@ -51,6 +51,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
|||||||
AddPiece: planOne(
|
AddPiece: planOne(
|
||||||
on(SectorPieceAdded{}, WaitDeals),
|
on(SectorPieceAdded{}, WaitDeals),
|
||||||
apply(SectorStartPacking{}),
|
apply(SectorStartPacking{}),
|
||||||
|
apply(SectorAddPiece{}),
|
||||||
on(SectorAddPieceFailed{}, AddPieceFailed),
|
on(SectorAddPieceFailed{}, AddPieceFailed),
|
||||||
),
|
),
|
||||||
Packing: planOne(on(SectorPacked{}, GetTicket)),
|
Packing: planOne(on(SectorPacked{}, GetTicket)),
|
||||||
@ -224,6 +225,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
|||||||
|
|
||||||
func (m *Sealing) logEvents(events []statemachine.Event, state *SectorInfo) {
|
func (m *Sealing) logEvents(events []statemachine.Event, state *SectorInfo) {
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
|
log.Debugw("sector event", "sector", state.SectorNumber, "type", fmt.Sprintf("%T", event.User), "event", event.User)
|
||||||
|
|
||||||
e, err := json.Marshal(event)
|
e, err := json.Marshal(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("marshaling event for logging: %+v", err)
|
log.Errorf("marshaling event for logging: %+v", err)
|
||||||
@ -234,6 +237,10 @@ func (m *Sealing) logEvents(events []statemachine.Event, state *SectorInfo) {
|
|||||||
continue // don't log on every fsm restart
|
continue // don't log on every fsm restart
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(e) > 8000 {
|
||||||
|
e = []byte(string(e[:8000]) + "... truncated")
|
||||||
|
}
|
||||||
|
|
||||||
l := Log{
|
l := Log{
|
||||||
Timestamp: uint64(time.Now().Unix()),
|
Timestamp: uint64(time.Now().Unix()),
|
||||||
Message: string(e),
|
Message: string(e),
|
||||||
@ -582,6 +589,7 @@ func onReturning(mut mutator) func() (mutator, func(*SectorInfo) (bool, error))
|
|||||||
|
|
||||||
func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
|
func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err error))) func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
|
||||||
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
|
return func(events []statemachine.Event, state *SectorInfo) (uint64, error) {
|
||||||
|
eloop:
|
||||||
for i, event := range events {
|
for i, event := range events {
|
||||||
if gm, ok := event.User.(globalMutator); ok {
|
if gm, ok := event.User.(globalMutator); ok {
|
||||||
gm.applyGlobal(state)
|
gm.applyGlobal(state)
|
||||||
@ -604,6 +612,8 @@ func planOne(ts ...func() (mut mutator, next func(*SectorInfo) (more bool, err e
|
|||||||
if err != nil || !more {
|
if err != nil || !more {
|
||||||
return uint64(i + 1), err
|
return uint64(i + 1), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
continue eloop
|
||||||
}
|
}
|
||||||
|
|
||||||
_, ok := event.User.(Ignorable)
|
_, ok := event.User.(Ignorable)
|
||||||
|
47
extern/storage-sealing/input.go
vendored
47
extern/storage-sealing/input.go
vendored
@ -27,6 +27,18 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
|
|||||||
|
|
||||||
m.inputLk.Lock()
|
m.inputLk.Lock()
|
||||||
|
|
||||||
|
if m.creating != nil && *m.creating == sector.SectorNumber {
|
||||||
|
m.creating = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sid := m.minerSectorID(sector.SectorNumber)
|
||||||
|
|
||||||
|
if len(m.assignedPieces[sid]) > 0 {
|
||||||
|
m.inputLk.Unlock()
|
||||||
|
// got assigned more pieces in the AddPiece state
|
||||||
|
return ctx.Send(SectorAddPiece{})
|
||||||
|
}
|
||||||
|
|
||||||
started, err := m.maybeStartSealing(ctx, sector, used)
|
started, err := m.maybeStartSealing(ctx, sector, used)
|
||||||
if err != nil || started {
|
if err != nil || started {
|
||||||
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
|
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
|
||||||
@ -36,16 +48,16 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{
|
if _, has := m.openSectors[sid]; !has {
|
||||||
used: used,
|
m.openSectors[sid] = &openSector{
|
||||||
maybeAccept: func(cid cid.Cid) error {
|
used: used,
|
||||||
// todo check deal start deadline (configurable)
|
maybeAccept: func(cid cid.Cid) error {
|
||||||
|
// todo check deal start deadline (configurable)
|
||||||
|
m.assignedPieces[sid] = append(m.assignedPieces[sid], cid)
|
||||||
|
|
||||||
sid := m.minerSectorID(sector.SectorNumber)
|
return ctx.Send(SectorAddPiece{})
|
||||||
m.assignedPieces[sid] = append(m.assignedPieces[sid], cid)
|
},
|
||||||
|
}
|
||||||
return ctx.Send(SectorAddPiece{})
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@ -350,11 +362,19 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
avail := abi.PaddedPieceSize(ssize).Unpadded() - m.openSectors[mt.sector].used
|
||||||
|
|
||||||
|
if mt.size > avail {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
err := m.openSectors[mt.sector].maybeAccept(mt.deal)
|
err := m.openSectors[mt.sector].maybeAccept(mt.deal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece
|
m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.openSectors[mt.sector].used += mt.padding + mt.size
|
||||||
|
|
||||||
m.pendingPieces[mt.deal].assigned = true
|
m.pendingPieces[mt.deal].assigned = true
|
||||||
delete(toAssign, mt.deal)
|
delete(toAssign, mt.deal)
|
||||||
|
|
||||||
@ -362,8 +382,6 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
|||||||
log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err)
|
log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(m.openSectors, mt.sector)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(toAssign) > 0 {
|
if len(toAssign) > 0 {
|
||||||
@ -378,6 +396,10 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
|||||||
func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
|
func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
|
||||||
m.startupWait.Wait()
|
m.startupWait.Wait()
|
||||||
|
|
||||||
|
if m.creating != nil {
|
||||||
|
return nil // new sector is being created right now
|
||||||
|
}
|
||||||
|
|
||||||
cfg, err := m.getConfig()
|
cfg, err := m.getConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting storage config: %w", err)
|
return xerrors.Errorf("getting storage config: %w", err)
|
||||||
@ -396,6 +418,8 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m.creating = &sid
|
||||||
|
|
||||||
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
|
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
|
||||||
return m.sectors.Send(uint64(sid), SectorStart{
|
return m.sectors.Send(uint64(sid), SectorStart{
|
||||||
ID: sid,
|
ID: sid,
|
||||||
@ -426,6 +450,7 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi
|
|||||||
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
|
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
|
||||||
m.startupWait.Wait()
|
m.startupWait.Wait()
|
||||||
|
|
||||||
|
log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user")
|
||||||
return m.sectors.Send(uint64(sid), SectorStartPacking{})
|
return m.sectors.Send(uint64(sid), SectorStartPacking{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
1
extern/storage-sealing/sealing.go
vendored
1
extern/storage-sealing/sealing.go
vendored
@ -98,6 +98,7 @@ type Sealing struct {
|
|||||||
sectorTimers map[abi.SectorID]*time.Timer
|
sectorTimers map[abi.SectorID]*time.Timer
|
||||||
pendingPieces map[cid.Cid]*pendingPiece
|
pendingPieces map[cid.Cid]*pendingPiece
|
||||||
assignedPieces map[abi.SectorID][]cid.Cid
|
assignedPieces map[abi.SectorID][]cid.Cid
|
||||||
|
creating *abi.SectorNumber // used to prevent a race where we could create a new sector more than once
|
||||||
|
|
||||||
upgradeLk sync.Mutex
|
upgradeLk sync.Mutex
|
||||||
toUpgrade map[abi.SectorNumber]struct{}
|
toUpgrade map[abi.SectorNumber]struct{}
|
||||||
|
2
extern/storage-sealing/states_sealing.go
vendored
2
extern/storage-sealing/states_sealing.go
vendored
@ -37,7 +37,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
|
|||||||
}
|
}
|
||||||
|
|
||||||
// todo: return to the sealing queue (this is extremely unlikely to happen)
|
// todo: return to the sealing queue (this is extremely unlikely to happen)
|
||||||
pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector entered packing state early"))
|
pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector %d entered packing state early", sector.SectorNumber))
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
|
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
|
||||||
|
@ -245,13 +245,13 @@ func (sm *StorageMinerAPI) SectorsList(context.Context) ([]abi.SectorNumber, err
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
out := make([]abi.SectorNumber, len(sectors))
|
out := make([]abi.SectorNumber, 0, len(sectors))
|
||||||
for i, sector := range sectors {
|
for _, sector := range sectors {
|
||||||
if sector.State == sealing.UndefinedSectorState {
|
if sector.State == sealing.UndefinedSectorState {
|
||||||
continue // sector ID not set yet
|
continue // sector ID not set yet
|
||||||
}
|
}
|
||||||
|
|
||||||
out[i] = sector.SectorNumber
|
out = append(out, sector.SectorNumber)
|
||||||
}
|
}
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
@ -63,6 +63,24 @@ func TestAPIDealFlow(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBatchDealInput(t *testing.T) {
|
||||||
|
logging.SetLogLevel("miner", "ERROR")
|
||||||
|
logging.SetLogLevel("chainstore", "ERROR")
|
||||||
|
logging.SetLogLevel("chain", "ERROR")
|
||||||
|
logging.SetLogLevel("sub", "ERROR")
|
||||||
|
logging.SetLogLevel("storageminer", "ERROR")
|
||||||
|
logging.SetLogLevel("sectors", "DEBUG")
|
||||||
|
|
||||||
|
blockTime := 10 * time.Millisecond
|
||||||
|
|
||||||
|
// For these tests where the block time is artificially short, just use
|
||||||
|
// a deal start epoch that is guaranteed to be far enough in the future
|
||||||
|
// so that the deal starts sealing in time
|
||||||
|
dealStartEpoch := abi.ChainEpoch(2 << 12)
|
||||||
|
|
||||||
|
test.TestBatchDealInput(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
|
||||||
|
}
|
||||||
|
|
||||||
func TestAPIDealFlowReal(t *testing.T) {
|
func TestAPIDealFlowReal(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("skipping test in short mode")
|
t.Skip("skipping test in short mode")
|
||||||
|
Loading…
Reference in New Issue
Block a user