Maybe working precommit batching

This commit is contained in:
Łukasz Magiera 2021-05-18 17:21:10 +02:00
parent d92c5e1001
commit f66b9c5663
6 changed files with 99 additions and 50 deletions

View File

@ -291,6 +291,8 @@ var stateList = []stateMeta{
{col: color.FgYellow, state: sealing.PreCommit2}, {col: color.FgYellow, state: sealing.PreCommit2},
{col: color.FgYellow, state: sealing.PreCommitting}, {col: color.FgYellow, state: sealing.PreCommitting},
{col: color.FgYellow, state: sealing.PreCommitWait}, {col: color.FgYellow, state: sealing.PreCommitWait},
{col: color.FgYellow, state: sealing.SubmitPreCommitBatch},
{col: color.FgYellow, state: sealing.PreCommitBatchWait},
{col: color.FgYellow, state: sealing.WaitSeed}, {col: color.FgYellow, state: sealing.WaitSeed},
{col: color.FgYellow, state: sealing.Committing}, {col: color.FgYellow, state: sealing.Committing},
{col: color.FgYellow, state: sealing.SubmitCommit}, {col: color.FgYellow, state: sealing.SubmitCommit},

View File

@ -71,6 +71,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed), on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
), ),
PreCommitting: planOne( PreCommitting: planOne(
on(SectorPreCommitBatch{}, SubmitPreCommitBatch),
on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed), on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed),
on(SectorPreCommitted{}, PreCommitWait), on(SectorPreCommitted{}, PreCommitWait),
on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorChainPreCommitFailed{}, PreCommitFailed),
@ -340,6 +341,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handlePreCommit2, processed, nil return m.handlePreCommit2, processed, nil
case PreCommitting: case PreCommitting:
return m.handlePreCommitting, processed, nil return m.handlePreCommitting, processed, nil
case SubmitPreCommitBatch:
return m.handleSubmitPreCommitBatch, processed, nil
case PreCommitWait: case PreCommitWait:
return m.handlePreCommitWait, processed, nil return m.handlePreCommitWait, processed, nil
case WaitSeed: case WaitSeed:
@ -348,12 +351,12 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handleCommitting, processed, nil return m.handleCommitting, processed, nil
case SubmitCommit: case SubmitCommit:
return m.handleSubmitCommit, processed, nil return m.handleSubmitCommit, processed, nil
case SubmitCommitAggregate:
return m.handleSubmitCommitAggregate, processed, nil
case CommitAggregateWait: case CommitAggregateWait:
fallthrough fallthrough
case CommitWait: case CommitWait:
return m.handleCommitWait, processed, nil return m.handleCommitWait, processed, nil
case SubmitCommitAggregate:
return m.handleSubmitCommitAggregate, processed, nil
case FinalizeSector: case FinalizeSector:
return m.handleFinalizeSector, processed, nil return m.handleFinalizeSector, processed, nil

View File

@ -150,6 +150,10 @@ func (evt SectorPreCommit2) apply(state *SectorInfo) {
state.CommR = &commr state.CommR = &commr
} }
type SectorPreCommitBatch struct{}
func (evt SectorPreCommitBatch) apply(*SectorInfo) {}
type SectorPreCommitLanded struct { type SectorPreCommitLanded struct {
TipSet TipSetToken TipSet TipSetToken
} }

View File

@ -20,20 +20,17 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
) )
var (
// TODO: config
PreCommitBatchMax uint64 = 100 // adjust based on real-world gas numbers, actors limit at 10k
PreCommitBatchMin uint64 = 1
PreCommitBatchWait = 5 * time.Minute
)
type PreCommitBatcherApi interface { type PreCommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error) ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
} }
type preCommitEntry struct {
deposit abi.TokenAmount
pci *miner0.SectorPreCommitInfo
}
type PreCommitBatcher struct { type PreCommitBatcher struct {
api PreCommitBatcherApi api PreCommitBatcherApi
maddr address.Address maddr address.Address
@ -43,7 +40,7 @@ type PreCommitBatcher struct {
getConfig GetSealingConfigFunc getConfig GetSealingConfigFunc
deadlines map[abi.SectorNumber]time.Time deadlines map[abi.SectorNumber]time.Time
todo map[abi.SectorNumber]*miner0.SectorPreCommitInfo todo map[abi.SectorNumber]*preCommitEntry
waiting map[abi.SectorNumber][]chan cid.Cid waiting map[abi.SectorNumber][]chan cid.Cid
notify, stop, stopped chan struct{} notify, stop, stopped chan struct{}
@ -61,7 +58,7 @@ func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCom
getConfig: getConfig, getConfig: getConfig,
deadlines: map[abi.SectorNumber]time.Time{}, deadlines: map[abi.SectorNumber]time.Time{},
todo: map[abi.SectorNumber]*miner0.SectorPreCommitInfo{}, todo: map[abi.SectorNumber]*preCommitEntry{},
waiting: map[abi.SectorNumber][]chan cid.Cid{}, waiting: map[abi.SectorNumber][]chan cid.Cid{},
notify: make(chan struct{}, 1), notify: make(chan struct{}, 1),
@ -172,8 +169,11 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
return nil, nil return nil, nil
} }
deposit := big.Zero()
for _, p := range b.todo { for _, p := range b.todo {
params.Sectors = append(params.Sectors, p) params.Sectors = append(params.Sectors, p.pci)
deposit = big.Add(deposit, p.deposit)
} }
enc := new(bytes.Buffer) enc := new(bytes.Buffer)
@ -186,12 +186,14 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
return nil, xerrors.Errorf("couldn't get miner info: %w", err) return nil, xerrors.Errorf("couldn't get miner info: %w", err)
} }
from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, b.feeCfg.MaxPreCommitGasFee, b.feeCfg.MaxPreCommitGasFee) goodFunds := big.Add(deposit, b.feeCfg.MaxPreCommitGasFee)
from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit)
if err != nil { if err != nil {
return nil, xerrors.Errorf("no good address found: %w", err) return nil, xerrors.Errorf("no good address found: %w", err)
} }
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSectorBatch, big.Zero(), b.feeCfg.MaxPreCommitGasFee, enc.Bytes()) mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.PreCommitSectorBatch, deposit, b.feeCfg.MaxPreCommitGasFee, enc.Bytes())
if err != nil { if err != nil {
return nil, xerrors.Errorf("sending message failed: %w", err) return nil, xerrors.Errorf("sending message failed: %w", err)
} }
@ -213,7 +215,7 @@ func (b *PreCommitBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
} }
// register PreCommit, wait for batch message, return message CID // register PreCommit, wait for batch message, return message CID
func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, in *miner0.SectorPreCommitInfo) (mcid cid.Cid, err error) { func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner0.SectorPreCommitInfo) (mcid cid.Cid, err error) {
_, curEpoch, err := b.api.ChainHead(b.mctx) _, curEpoch, err := b.api.ChainHead(b.mctx)
if err != nil { if err != nil {
log.Errorf("getting chain head: %s", err) log.Errorf("getting chain head: %s", err)
@ -224,7 +226,10 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, in *m
b.lk.Lock() b.lk.Lock()
b.deadlines[sn] = getSectorDeadline(curEpoch, s) b.deadlines[sn] = getSectorDeadline(curEpoch, s)
b.todo[sn] = in b.todo[sn] = &preCommitEntry{
deposit: deposit,
pci: in,
}
sent := make(chan cid.Cid, 1) sent := make(chan cid.Cid, 1)
b.waiting[sn] = append(b.waiting[sn], sent) b.waiting[sn] = append(b.waiting[sn], sent)
@ -271,7 +276,7 @@ func (b *PreCommitBatcher) Pending(ctx context.Context) ([]abi.SectorID, error)
for _, s := range b.todo { for _, s := range b.todo {
res = append(res, abi.SectorID{ res = append(res, abi.SectorID{
Miner: abi.ActorID(mid), Miner: abi.ActorID(mid),
Number: s.SectorNumber, Number: s.pci.SectorNumber,
}) })
} }

View File

@ -13,6 +13,8 @@ var ExistSectorStateList = map[SectorState]struct{}{
PreCommit2: {}, PreCommit2: {},
PreCommitting: {}, PreCommitting: {},
PreCommitWait: {}, PreCommitWait: {},
SubmitPreCommitBatch: {},
PreCommitBatchWait: {},
WaitSeed: {}, WaitSeed: {},
Committing: {}, Committing: {},
SubmitCommit: {}, SubmitCommit: {},
@ -54,8 +56,13 @@ const (
GetTicket SectorState = "GetTicket" // generate ticket GetTicket SectorState = "GetTicket" // generate ticket
PreCommit1 SectorState = "PreCommit1" // do PreCommit1 PreCommit1 SectorState = "PreCommit1" // do PreCommit1
PreCommit2 SectorState = "PreCommit2" // do PreCommit2 PreCommit2 SectorState = "PreCommit2" // do PreCommit2
PreCommitting SectorState = "PreCommitting" // on chain pre-commit PreCommitting SectorState = "PreCommitting" // on chain pre-commit
PreCommitWait SectorState = "PreCommitWait" // waiting for precommit to land on chain PreCommitWait SectorState = "PreCommitWait" // waiting for precommit to land on chain
SubmitPreCommitBatch SectorState = "SubmitPreCommitBatch"
PreCommitBatchWait SectorState = "PreCommitBatchWait"
WaitSeed SectorState = "WaitSeed" // waiting for seed WaitSeed SectorState = "WaitSeed" // waiting for seed
Committing SectorState = "Committing" // compute PoRep Committing SectorState = "Committing" // compute PoRep
@ -99,7 +106,7 @@ func toStatState(st SectorState) statSectorState {
switch st { switch st {
case UndefinedSectorState, Empty, WaitDeals, AddPiece: case UndefinedSectorState, Empty, WaitDeals, AddPiece:
return sstStaging return sstStaging
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector: case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, FinalizeSector:
return sstSealing return sstSealing
case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed: case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
return sstProving return sstProving

View File

@ -226,56 +226,50 @@ func (m *Sealing) remarkForUpgrade(sid abi.SectorNumber) {
} }
} }
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) (*miner.SectorPreCommitInfo, big.Int, TipSetToken, error) {
tok, height, err := m.api.ChainHead(ctx.Context()) tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil { if err != nil {
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil return nil, big.Zero(), nil, nil
}
mi, err := m.api.StateMinerInfo(ctx.Context(), m.maddr, tok)
if err != nil {
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil
} }
if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.api); err != nil { if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.api); err != nil {
switch err := err.(type) { switch err := err.(type) {
case *ErrApi: case *ErrApi:
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil return nil, big.Zero(), nil, nil
case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handlePreCommit1 will do that too) case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handlePreCommit1 will do that too)
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad CommD error: %w", err)}) return nil, big.Zero(), nil, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad CommD error: %w", err)})
case *ErrExpiredTicket: case *ErrExpiredTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired: %w", err)}) return nil, big.Zero(), nil, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired: %w", err)})
case *ErrBadTicket: case *ErrBadTicket:
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)}) return nil, big.Zero(), nil, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
case *ErrInvalidDeals: case *ErrInvalidDeals:
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err) log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
return ctx.Send(SectorInvalidDealIDs{Return: RetPreCommitting}) return nil, big.Zero(), nil, ctx.Send(SectorInvalidDealIDs{Return: RetPreCommitting})
case *ErrExpiredDeals: case *ErrExpiredDeals:
return ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) return nil, big.Zero(), nil, ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case *ErrPrecommitOnChain: case *ErrPrecommitOnChain:
return ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit return nil, big.Zero(), nil, ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit
case *ErrSectorNumberAllocated: case *ErrSectorNumberAllocated:
log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err) log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err)
// TODO: check if the sector is committed (not sure how we'd end up here) // TODO: check if the sector is committed (not sure how we'd end up here)
return nil return nil, big.Zero(), nil, nil
default: default:
return xerrors.Errorf("checkPrecommit sanity check error: %w", err) return nil, big.Zero(), nil, xerrors.Errorf("checkPrecommit sanity check error: %w", err)
} }
} }
expiration, err := m.pcp.Expiration(ctx.Context(), sector.Pieces...) expiration, err := m.pcp.Expiration(ctx.Context(), sector.Pieces...)
if err != nil { if err != nil {
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("handlePreCommitting: failed to compute pre-commit expiry: %w", err)}) return nil, big.Zero(), nil, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("handlePreCommitting: failed to compute pre-commit expiry: %w", err)})
} }
// Sectors must last _at least_ MinSectorExpiration + MaxSealDuration. // Sectors must last _at least_ MinSectorExpiration + MaxSealDuration.
// TODO: The "+10" allows the pre-commit to take 10 blocks to be accepted. // TODO: The "+10" allows the pre-commit to take 10 blocks to be accepted.
nv, err := m.api.StateNetworkVersion(ctx.Context(), tok) nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
if err != nil { if err != nil {
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("failed to get network version: %w", err)}) return nil, big.Zero(), nil, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("failed to get network version: %w", err)})
} }
msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType) msd := policy.GetMaxProveCommitDuration(actors.VersionForNetwork(nv), sector.SectorType)
@ -297,17 +291,33 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
depositMinimum := m.tryUpgradeSector(ctx.Context(), params) depositMinimum := m.tryUpgradeSector(ctx.Context(), params)
collateral, err := m.api.StateMinerPreCommitDepositForPower(ctx.Context(), m.maddr, *params, tok)
if err != nil {
return nil, big.Zero(), nil, xerrors.Errorf("getting initial pledge collateral: %w", err)
}
deposit := big.Max(depositMinimum, collateral)
return params, deposit, tok, nil
}
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
params, deposit, tok, err := m.preCommitParams(ctx, sector)
if err != nil {
return err
}
enc := new(bytes.Buffer) enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil { if err := params.MarshalCBOR(enc); err != nil {
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("could not serialize pre-commit sector parameters: %w", err)}) return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("could not serialize pre-commit sector parameters: %w", err)})
} }
collateral, err := m.api.StateMinerPreCommitDepositForPower(ctx.Context(), m.maddr, *params, tok) mi, err := m.api.StateMinerInfo(ctx.Context(), m.maddr, tok)
if err != nil { if err != nil {
return xerrors.Errorf("getting initial pledge collateral: %w", err) log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil
} }
deposit := big.Max(depositMinimum, collateral)
goodFunds := big.Add(deposit, m.feeCfg.MaxPreCommitGasFee) goodFunds := big.Add(deposit, m.feeCfg.MaxPreCommitGasFee)
from, _, err := m.addrSel(ctx.Context(), mi, api.PreCommitAddr, goodFunds, deposit) from, _, err := m.addrSel(ctx.Context(), mi, api.PreCommitAddr, goodFunds, deposit)
@ -327,6 +337,24 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
return ctx.Send(SectorPreCommitted{Message: mcid, PreCommitDeposit: deposit, PreCommitInfo: *params}) return ctx.Send(SectorPreCommitted{Message: mcid, PreCommitDeposit: deposit, PreCommitInfo: *params})
} }
func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector SectorInfo) error {
if sector.CommD == nil || sector.CommR == nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
}
params, deposit, _, err := m.preCommitParams(ctx, sector)
if err != nil {
return err
}
mcid, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params)
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})
}
return ctx.Send(SectorCommitAggregateSent{mcid})
}
func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error {
if sector.PreCommitMessage == nil { if sector.PreCommitMessage == nil {
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("precommit message was nil")}) return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("precommit message was nil")})