Make commit batcher more robust

This commit is contained in:
Łukasz Magiera 2021-06-01 11:56:19 +02:00
parent b149eb8d02
commit 9fcb564bef
9 changed files with 201 additions and 53 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
) )
// StorageMiner is a low-level interface to the Filecoin network storage miner node // StorageMiner is a low-level interface to the Filecoin network storage miner node
@ -87,7 +88,7 @@ type StorageMiner interface {
SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin
// SectorCommitFlush immediately sends a Commit message with sectors aggregated for Commit. // SectorCommitFlush immediately sends a Commit message with sectors aggregated for Commit.
// Returns null if message wasn't sent // Returns null if message wasn't sent
SectorCommitFlush(ctx context.Context) (*cid.Cid, error) //perm:admin SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) //perm:admin
// SectorCommitPending returns a list of pending Commit sectors to be sent in the next aggregate message // SectorCommitPending returns a list of pending Commit sectors to be sent in the next aggregate message
SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin

View File

@ -28,6 +28,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
marketevents "github.com/filecoin-project/lotus/markets/loggers" marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
@ -639,7 +640,7 @@ type StorageMinerStruct struct {
SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"` SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"`
SectorCommitFlush func(p0 context.Context) (*cid.Cid, error) `perm:"admin"` SectorCommitFlush func(p0 context.Context) ([]sealiface.CommitBatchRes, error) `perm:"admin"`
SectorCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"` SectorCommitPending func(p0 context.Context) ([]abi.SectorID, error) `perm:"admin"`
@ -1931,7 +1932,7 @@ func (s *StorageMinerStruct) SealingSchedDiag(p0 context.Context, p1 bool) (inte
return s.Internal.SealingSchedDiag(p0, p1) return s.Internal.SealingSchedDiag(p0, p1)
} }
func (s *StorageMinerStruct) SectorCommitFlush(p0 context.Context) (*cid.Cid, error) { func (s *StorageMinerStruct) SectorCommitFlush(p0 context.Context) ([]sealiface.CommitBatchRes, error) {
return s.Internal.SectorCommitFlush(p0) return s.Internal.SectorCommitFlush(p0)
} }

View File

@ -178,7 +178,7 @@ func TestPledgeBatching(t *testing.T, b APIBuilder, blocktime time.Duration, nSe
cb, err := miner.SectorCommitFlush(ctx) cb, err := miner.SectorCommitFlush(ctx)
require.NoError(t, err) require.NoError(t, err)
if cb != nil { if cb != nil {
fmt.Printf("COMMIT BATCH: %s\n", *cb) fmt.Printf("COMMIT BATCH: %+v\n", cb)
} }
} }
@ -325,7 +325,7 @@ func flushSealingBatches(t *testing.T, ctx context.Context, miner TestStorageNod
cb, err := miner.SectorCommitFlush(ctx) cb, err := miner.SectorCommitFlush(ctx)
require.NoError(t, err) require.NoError(t, err)
if cb != nil { if cb != nil {
fmt.Printf("COMMIT BATCH: %s\n", *cb) fmt.Printf("COMMIT BATCH: %+v\n", cb)
} }
} }

View File

@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
) )
const arp = abi.RegisteredAggregationProof_SnarkPackV1 const arp = abi.RegisteredAggregationProof_SnarkPackV1
@ -30,6 +31,9 @@ type CommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error) ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, TipSetToken) (big.Int, error)
} }
type AggregateInput struct { type AggregateInput struct {
@ -49,10 +53,10 @@ type CommitBatcher struct {
deadlines map[abi.SectorNumber]time.Time deadlines map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]AggregateInput todo map[abi.SectorNumber]AggregateInput
waiting map[abi.SectorNumber][]chan cid.Cid waiting map[abi.SectorNumber][]chan sealiface.CommitBatchRes
notify, stop, stopped chan struct{} notify, stop, stopped chan struct{}
force chan chan *cid.Cid force chan chan []sealiface.CommitBatchRes
lk sync.Mutex lk sync.Mutex
} }
@ -68,10 +72,10 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
deadlines: map[abi.SectorNumber]time.Time{}, deadlines: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]AggregateInput{}, todo: map[abi.SectorNumber]AggregateInput{},
waiting: map[abi.SectorNumber][]chan cid.Cid{}, waiting: map[abi.SectorNumber][]chan sealiface.CommitBatchRes{},
notify: make(chan struct{}, 1), notify: make(chan struct{}, 1),
force: make(chan chan *cid.Cid), force: make(chan chan []sealiface.CommitBatchRes),
stop: make(chan struct{}), stop: make(chan struct{}),
stopped: make(chan struct{}), stopped: make(chan struct{}),
} }
@ -82,8 +86,8 @@ func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBat
} }
func (b *CommitBatcher) run() { func (b *CommitBatcher) run() {
var forceRes chan *cid.Cid var forceRes chan []sealiface.CommitBatchRes
var lastMsg *cid.Cid var lastMsg []sealiface.CommitBatchRes
cfg, err := b.getConfig() cfg, err := b.getConfig()
if err != nil { if err != nil {
@ -111,7 +115,7 @@ func (b *CommitBatcher) run() {
} }
var err error var err error
lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin) lastMsg, err = b.maybeStartBatch(sendAboveMax, sendAboveMin)
if err != nil { if err != nil {
log.Warnw("CommitBatcher processBatch error", "error", err) log.Warnw("CommitBatcher processBatch error", "error", err)
} }
@ -159,12 +163,9 @@ func (b *CommitBatcher) batchWait(maxWait, slack time.Duration) <-chan time.Time
return time.After(wait) return time.After(wait)
} }
func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) { func (b *CommitBatcher) maybeStartBatch(notif, after bool) ([]sealiface.CommitBatchRes, error) {
b.lk.Lock() b.lk.Lock()
defer b.lk.Unlock() defer b.lk.Unlock()
params := miner5.ProveCommitAggregateParams{
SectorNumbers: bitfield.New(),
}
total := len(b.todo) total := len(b.todo)
if total == 0 { if total == 0 {
@ -184,6 +185,45 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
return nil, nil return nil, nil
} }
var res []sealiface.CommitBatchRes
if total < cfg.MinCommitBatch || total < miner5.PreCommitSectorBatchMaxSize {
res, err = b.processIndividually()
} else {
res, err = b.processBatch(cfg)
}
if err != nil && len(res) == 0 {
return nil, err
}
for _, r := range res {
if err != nil {
r.Error = err.Error()
}
for _, sn := range r.Sectors {
for _, ch := range b.waiting[sn] {
ch <- r // buffered
}
delete(b.waiting, sn)
delete(b.todo, sn)
delete(b.deadlines, sn)
}
}
return res, nil
}
func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) {
total := len(b.todo)
var res sealiface.CommitBatchRes
params := miner5.ProveCommitAggregateParams{
SectorNumbers: bitfield.New(),
}
proofs := make([][]byte, 0, total) proofs := make([][]byte, 0, total)
infos := make([]proof5.AggregateSealVerifyInfo, 0, total) infos := make([]proof5.AggregateSealVerifyInfo, 0, total)
@ -202,12 +242,13 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
}) })
for _, info := range infos { for _, info := range infos {
res.Sectors = append(res.Sectors, info.Number)
proofs = append(proofs, b.todo[info.Number].proof) proofs = append(proofs, b.todo[info.Number].proof)
} }
mid, err := address.IDFromAddress(b.maddr) mid, err := address.IDFromAddress(b.maddr)
if err != nil { if err != nil {
return nil, xerrors.Errorf("getting miner id: %w", err) return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err)
} }
params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{ params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{
@ -217,55 +258,107 @@ func (b *CommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
Infos: infos, Infos: infos,
}, proofs) }, proofs)
if err != nil { if err != nil {
return nil, xerrors.Errorf("aggregating proofs: %w", err) return []sealiface.CommitBatchRes{res}, xerrors.Errorf("aggregating proofs: %w", err)
} }
enc := new(bytes.Buffer) enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil { if err := params.MarshalCBOR(enc); err != nil {
return nil, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err) return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't serialize ProveCommitAggregateParams: %w", err)
} }
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get miner info: %w", err)
}
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, b.feeCfg.MaxCommitGasFee, b.feeCfg.MaxCommitGasFee)
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
}
// todo: collateral
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, big.Zero(), b.feeCfg.MaxCommitGasFee, enc.Bytes())
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
}
res.Msg = &mcid
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos))
return []sealiface.CommitBatchRes{res}, nil
}
func (b *CommitBatcher) processIndividually() ([]sealiface.CommitBatchRes, error) {
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
if err != nil { if err != nil {
return nil, xerrors.Errorf("couldn't get miner info: %w", err) return nil, xerrors.Errorf("couldn't get miner info: %w", err)
} }
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, b.feeCfg.MaxCommitGasFee, b.feeCfg.MaxCommitGasFee) tok, _, err := b.api.ChainHead(b.mctx)
if err != nil { if err != nil {
return nil, xerrors.Errorf("no good address found: %w", err) return nil, err
} }
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitAggregate, big.Zero(), b.feeCfg.MaxCommitGasFee, enc.Bytes()) var res []sealiface.CommitBatchRes
if err != nil {
return nil, xerrors.Errorf("sending message failed: %w", err)
}
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "todo", total, "sectors", len(infos)) for sn, info := range b.todo {
r := sealiface.CommitBatchRes{
err = params.SectorNumbers.ForEach(func(us uint64) error { Sectors: []abi.SectorNumber{sn},
sn := abi.SectorNumber(us)
for _, ch := range b.waiting[sn] {
ch <- mcid // buffered
} }
delete(b.waiting, sn)
delete(b.todo, sn) mcid, err := b.processSingle(mi, sn, info, tok)
delete(b.deadlines, sn) if err != nil {
return nil log.Errorf("process single error: %+v", err) // todo: return to user
}) r.FailedSectors[sn] = err.Error()
if err != nil { } else {
return nil, xerrors.Errorf("done sectors foreach: %w", err) r.Msg = &mcid
}
res = append(res, r)
} }
return &mcid, nil return res, nil
}
func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, info AggregateInput, tok TipSetToken) (cid.Cid, error) {
enc := new(bytes.Buffer)
params := &miner.ProveCommitSectorParams{
SectorNumber: sn,
Proof: info.proof,
}
if err := params.MarshalCBOR(enc); err != nil {
return cid.Undef, xerrors.Errorf("marshaling commit params: %w", err)
}
collateral, err := b.getSectorCollateral(sn, tok)
if err != nil {
return cid.Undef, err
}
goodFunds := big.Add(collateral, b.feeCfg.MaxCommitGasFee)
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral)
if err != nil {
return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.ProveCommitSector, collateral, b.feeCfg.MaxCommitGasFee, enc.Bytes())
if err != nil {
return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err)
}
return mcid, nil
} }
// register commit, wait for batch message, return message CID // register commit, wait for batch message, return message CID
func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (mcid cid.Cid, err error) { func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in AggregateInput) (res sealiface.CommitBatchRes, err error) {
_, curEpoch, err := b.api.ChainHead(b.mctx) _, curEpoch, err := b.api.ChainHead(b.mctx)
if err != nil { if err != nil {
log.Errorf("getting chain head: %s", err) log.Errorf("getting chain head: %s", err)
return cid.Undef, nil return sealiface.CommitBatchRes{}, nil
} }
sn := s.SectorNumber sn := s.SectorNumber
@ -274,7 +367,7 @@ func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in Aggregat
b.deadlines[sn] = getSectorDeadline(curEpoch, s) b.deadlines[sn] = getSectorDeadline(curEpoch, s)
b.todo[sn] = in b.todo[sn] = in
sent := make(chan cid.Cid, 1) sent := make(chan sealiface.CommitBatchRes, 1)
b.waiting[sn] = append(b.waiting[sn], sent) b.waiting[sn] = append(b.waiting[sn], sent)
select { select {
@ -284,15 +377,15 @@ func (b *CommitBatcher) AddCommit(ctx context.Context, s SectorInfo, in Aggregat
b.lk.Unlock() b.lk.Unlock()
select { select {
case c := <-sent: case r := <-sent:
return c, nil return r, nil
case <-ctx.Done(): case <-ctx.Done():
return cid.Undef, ctx.Err() return sealiface.CommitBatchRes{}, ctx.Err()
} }
} }
func (b *CommitBatcher) Flush(ctx context.Context) (*cid.Cid, error) { func (b *CommitBatcher) Flush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
resCh := make(chan *cid.Cid, 1) resCh := make(chan []sealiface.CommitBatchRes, 1)
select { select {
case b.force <- resCh: case b.force <- resCh:
select { select {
@ -364,3 +457,25 @@ func getSectorDeadline(curEpoch abi.ChainEpoch, si SectorInfo) time.Time {
return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second) return time.Now().Add(time.Duration(deadlineEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second)
} }
func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok TipSetToken) (abi.TokenAmount, error) {
pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, sn, tok)
if err != nil {
return big.Zero(), xerrors.Errorf("getting precommit info: %w", err)
}
if pci == nil {
return big.Zero(), xerrors.Errorf("precommit info not found on chain")
}
collateral, err := b.api.StateMinerInitialPledgeCollateral(b.mctx, b.maddr, pci.Info, tok)
if err != nil {
return big.Zero(), xerrors.Errorf("getting initial pledge collateral: %w", err)
}
collateral = big.Sub(collateral, pci.PreCommitDeposit)
if collateral.LessThan(big.Zero()) {
collateral = big.Zero()
}
return collateral, nil
}

View File

@ -0,0 +1,16 @@
package sealiface
import (
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
)
type CommitBatchRes struct {
Sectors []abi.SectorNumber
FailedSectors map[abi.SectorNumber]string
Msg *cid.Cid
Error string // if set, means that all sectors are failed, implies Msg==nil
}

View File

@ -27,6 +27,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
) )
const SectorStorePrefix = "/sectors" const SectorStorePrefix = "/sectors"
@ -214,7 +215,7 @@ func (m *Sealing) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, e
return m.precommiter.Pending(ctx) return m.precommiter.Pending(ctx)
} }
func (m *Sealing) CommitFlush(ctx context.Context) (*cid.Cid, error) { func (m *Sealing) CommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
return m.commiter.Flush(ctx) return m.commiter.Flush(ctx)
} }

View File

@ -581,7 +581,7 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")}) return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
} }
mcid, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{ res, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{
info: proof.AggregateSealVerifyInfo{ info: proof.AggregateSealVerifyInfo{
Number: sector.SectorNumber, Number: sector.SectorNumber,
Randomness: sector.TicketValue, Randomness: sector.TicketValue,
@ -596,7 +596,19 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)}) return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})
} }
return ctx.Send(SectorCommitAggregateSent{mcid}) if res.Error != "" {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate error: %s", res.Error)})
}
if e, found := res.FailedSectors[sector.SectorNumber]; found {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector failed in aggregate processing: %s", e)})
}
if res.Msg == nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate message was nil")})
}
return ctx.Send(SectorCommitAggregateSent{*res.Msg})
} }
func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error {

View File

@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing" sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/api/apistruct"
@ -386,7 +387,7 @@ func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.Sect
return sm.Miner.MarkForUpgrade(id) return sm.Miner.MarkForUpgrade(id)
} }
func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) (*cid.Cid, error) { func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
return sm.Miner.CommitFlush(ctx) return sm.Miner.CommitFlush(ctx)
} }

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing" sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
) )
// TODO: refactor this to be direct somehow // TODO: refactor this to be direct somehow
@ -67,7 +68,7 @@ func (m *Miner) SectorPreCommitPending(ctx context.Context) ([]abi.SectorID, err
return m.sealing.SectorPreCommitPending(ctx) return m.sealing.SectorPreCommitPending(ctx)
} }
func (m *Miner) CommitFlush(ctx context.Context) (*cid.Cid, error) { func (m *Miner) CommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) {
return m.sealing.CommitFlush(ctx) return m.sealing.CommitFlush(ctx)
} }