marketadapter: Handle batch sealing messages

This commit is contained in:
Łukasz Magiera 2021-05-19 20:07:20 +02:00
parent 2a1b359ede
commit e088c71b9a
4 changed files with 79 additions and 32 deletions

View File

@ -435,6 +435,8 @@ func startSealingWaiting(t *testing.T, ctx context.Context, miner TestStorageNod
require.NoError(t, miner.SectorStartSealing(ctx, snum))
}
}
flushSealingBatches(t, ctx, miner)
}
func testRetrieval(t *testing.T, ctx context.Context, client api.FullNode, fcid cid.Cid, piece *cid.Cid, carExport bool, data []byte) {

View File

@ -201,6 +201,20 @@ func TestPledgeSector(t *testing.T, b APIBuilder, blocktime time.Duration, nSect
<-done
}
func flushSealingBatches(t *testing.T, ctx context.Context, miner TestStorageNode) {
pcb, err := miner.SectorPreCommitFlush(ctx)
require.NoError(t, err)
if pcb != nil {
fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb)
}
cb, err := miner.SectorCommitFlush(ctx)
require.NoError(t, err)
if cb != nil {
fmt.Printf("COMMIT BATCH: %s\n", *cb)
}
}
func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n, existing int, blockNotif <-chan struct{}) {
for i := 0; i < n; i++ {
if i%3 == 0 && blockNotif != nil {
@ -234,17 +248,7 @@ func pledgeSectors(t *testing.T, ctx context.Context, miner TestStorageNode, n,
}
for len(toCheck) > 0 {
pcb, err := miner.SectorPreCommitFlush(ctx)
require.NoError(t, err)
if pcb != nil {
fmt.Printf("PRECOMMIT BATCH: %s\n", *pcb)
}
cb, err := miner.SectorCommitFlush(ctx)
require.NoError(t, err)
if cb != nil {
fmt.Printf("COMMIT BATCH: %s\n", *cb)
}
flushSealingBatches(t, ctx, miner)
for n := range toCheck {
st, err := miner.SectorsStatus(ctx, n, false)

View File

@ -542,12 +542,12 @@ func TestSealAndVerifyAggregate(t *testing.T) {
aggStart := time.Now()
avi.Proof, err = ProofVerifier.AggregateSealProofs(avi, toAggregate)
avi.Proof, err = ProofProver.AggregateSealProofs(avi, toAggregate)
require.NoError(t, err)
aggDone := time.Now()
_, err = ProofVerifier.AggregateSealProofs(avi, toAggregate)
_, err = ProofProver.AggregateSealProofs(avi, toAggregate)
require.NoError(t, err)
aggHot := time.Now()

View File

@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
@ -109,7 +110,7 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context,
// Watch for a pre-commit message to the provider.
matchEvent := func(msg *types.Message) (bool, error) {
matched := msg.To == provider && msg.Method == miner.Methods.PreCommitSector
matched := msg.To == provider && (msg.Method == miner.Methods.PreCommitSector || msg.Method == miner.Methods.PreCommitSectorBatch)
return matched, nil
}
@ -137,12 +138,6 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context,
return true, nil
}
// Extract the message parameters
var params miner.SectorPreCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
}
// When there is a reorg, the deal ID may change, so get the
// current deal ID from the publish message CID
res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), &proposal, publishCid)
@ -150,13 +145,40 @@ func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context,
return false, err
}
// Check through the deal IDs associated with this message
for _, did := range params.DealIDs {
if did == res.DealID {
// Found the deal ID in this message. Callback with the sector ID.
cb(params.SectorNumber, false, nil)
return false, nil
// Extract the message parameters
switch msg.Method {
case miner.Methods.PreCommitSector:
var params miner.SectorPreCommitInfo
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
}
// Check through the deal IDs associated with this message
for _, did := range params.DealIDs {
if did == res.DealID {
// Found the deal ID in this message. Callback with the sector ID.
cb(params.SectorNumber, false, nil)
return false, nil
}
}
case miner.Methods.PreCommitSectorBatch:
var params miner5.PreCommitSectorBatchParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("unmarshal pre commit: %w", err)
}
for _, precommit := range params.Sectors {
// Check through the deal IDs associated with this message
for _, did := range precommit.DealIDs {
if did == res.DealID {
// Found the deal ID in this message. Callback with the sector ID.
cb(precommit.SectorNumber, false, nil)
return false, nil
}
}
}
default:
return false, xerrors.Errorf("unexpected method %d", msg.Method)
}
// Didn't find the deal ID in this message, so keep looking
@ -207,16 +229,35 @@ func (mgr *SectorCommittedManager) OnDealSectorCommitted(ctx context.Context, pr
// Match a prove-commit sent to the provider with the given sector number
matchEvent := func(msg *types.Message) (matched bool, err error) {
if msg.To != provider || msg.Method != miner.Methods.ProveCommitSector {
if msg.To != provider {
return false, nil
}
var params miner.ProveCommitSectorParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
}
switch msg.Method {
case miner.Methods.ProveCommitSector:
var params miner.ProveCommitSectorParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
}
return params.SectorNumber == sectorNumber, nil
return params.SectorNumber == sectorNumber, nil
case miner.Methods.ProveCommitAggregate:
var params miner5.ProveCommitAggregateParams
if err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)); err != nil {
return false, xerrors.Errorf("failed to unmarshal prove commit sector params: %w", err)
}
set, err := params.SectorNumbers.IsSet(uint64(sectorNumber))
if err != nil {
return false, xerrors.Errorf("checking if sectorNumber is set in commit aggregate message: %w", err)
}
return set, nil
default:
return false, nil
}
}
// The deal must be accepted by the deal proposal start epoch, so timeout