Wire up Precommit Batching

This commit is contained in:
Łukasz Magiera 2021-05-18 17:37:52 +02:00
parent f66b9c5663
commit 1946d2ffd4
9 changed files with 61 additions and 4 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -103,6 +103,8 @@
* [SectorGetExpectedSealDuration](#SectorGetExpectedSealDuration) * [SectorGetExpectedSealDuration](#SectorGetExpectedSealDuration)
* [SectorGetSealDelay](#SectorGetSealDelay) * [SectorGetSealDelay](#SectorGetSealDelay)
* [SectorMarkForUpgrade](#SectorMarkForUpgrade) * [SectorMarkForUpgrade](#SectorMarkForUpgrade)
* [SectorPreCommitFlush](#SectorPreCommitFlush)
* [SectorPreCommitPending](#SectorPreCommitPending)
* [SectorRemove](#SectorRemove) * [SectorRemove](#SectorRemove)
* [SectorSetExpectedSealDuration](#SectorSetExpectedSealDuration) * [SectorSetExpectedSealDuration](#SectorSetExpectedSealDuration)
* [SectorSetSealDelay](#SectorSetSealDelay) * [SectorSetSealDelay](#SectorSetSealDelay)
@ -1611,6 +1613,27 @@ Inputs:
Response: `{}` Response: `{}`
### SectorPreCommitFlush
SectorPreCommitFlush immediately sends a PreCommit message with sectors batched for PreCommit.
Returns null if message wasn't sent
Perms: admin
Inputs: `null`
Response: `null`
### SectorPreCommitPending
SectorPreCommitPending returns a list of pending PreCommit sectors to be sent in the next batch message
Perms: admin
Inputs: `null`
Response: `null`
### SectorRemove ### SectorRemove
SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can
be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties. be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties.

View File

@ -113,7 +113,7 @@ func (b *CommitBatcher) run() {
var err error var err error
lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin) lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin)
if err != nil { if err != nil {
log.Warnw("TerminateBatcher processBatch error", "error", err) log.Warnw("CommitBatcher processBatch error", "error", err)
} }
} }
} }

View File

@ -79,6 +79,14 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorDealsExpired{}, DealsExpired), on(SectorDealsExpired{}, DealsExpired),
on(SectorInvalidDealIDs{}, RecoverDealIDs), on(SectorInvalidDealIDs{}, RecoverDealIDs),
), ),
SubmitPreCommitBatch: planOne(
on(SectorPreCommitBatchSent{}, PreCommitBatchWait),
),
PreCommitBatchWait: planOne(
on(SectorChainPreCommitFailed{}, PreCommitFailed),
on(SectorPreCommitLanded{}, WaitSeed),
on(SectorRetryPreCommit{}, PreCommitting),
),
PreCommitWait: planOne( PreCommitWait: planOne(
on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorChainPreCommitFailed{}, PreCommitFailed),
on(SectorPreCommitLanded{}, WaitSeed), on(SectorPreCommitLanded{}, WaitSeed),
@ -343,6 +351,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handlePreCommitting, processed, nil return m.handlePreCommitting, processed, nil
case SubmitPreCommitBatch: case SubmitPreCommitBatch:
return m.handleSubmitPreCommitBatch, processed, nil return m.handleSubmitPreCommitBatch, processed, nil
case PreCommitBatchWait:
fallthrough
case PreCommitWait: case PreCommitWait:
return m.handlePreCommitWait, processed, nil return m.handlePreCommitWait, processed, nil
case WaitSeed: case WaitSeed:

View File

@ -154,6 +154,14 @@ type SectorPreCommitBatch struct{}
func (evt SectorPreCommitBatch) apply(*SectorInfo) {} func (evt SectorPreCommitBatch) apply(*SectorInfo) {}
type SectorPreCommitBatchSent struct {
Message cid.Cid
}
func (evt SectorPreCommitBatchSent) apply(state *SectorInfo) {
state.PreCommitMessage = &evt.Message
}
type SectorPreCommitLanded struct { type SectorPreCommitLanded struct {
TipSet TipSetToken TipSet TipSetToken
} }

View File

@ -104,7 +104,7 @@ func (b *PreCommitBatcher) run() {
var err error var err error
lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin) lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin)
if err != nil { if err != nil {
log.Warnw("TerminateBatcher processBatch error", "error", err) log.Warnw("PreCommitBatcher processBatch error", "error", err)
} }
} }
} }

View File

@ -302,6 +302,22 @@ func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) (
} }
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting config: %w", err)
}
if cfg.BatchPreCommits {
nv, err := m.api.StateNetworkVersion(ctx.Context(), nil)
if err != nil {
return xerrors.Errorf("getting network version: %w", err)
}
if nv >= network.Version13 {
return ctx.Send(SectorPreCommitBatch{})
}
}
params, deposit, tok, err := m.preCommitParams(ctx, sector) params, deposit, tok, err := m.preCommitParams(ctx, sector)
if err != nil { if err != nil {
return err return err
@ -349,10 +365,10 @@ func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector Se
mcid, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params) mcid, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params)
if err != nil { if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)}) return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)})
} }
return ctx.Send(SectorCommitAggregateSent{mcid}) return ctx.Send(SectorPreCommitBatchSent{mcid})
} }
func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error {