fix: sealing pipeline: Clear CreationTime when starting sector upgrade

This commit is contained in:
Łukasz Magiera 2022-11-14 18:10:53 +01:00
parent 950e6116e5
commit d8c1b67a22
2 changed files with 15 additions and 6 deletions

View File

@ -323,6 +323,9 @@ func (evt SectorStartCCUpdate) apply(state *SectorInfo) {
// Clear filler piece but remember in case of abort // Clear filler piece but remember in case of abort
state.CCPieces = state.Pieces state.CCPieces = state.Pieces
state.Pieces = nil state.Pieces = nil
// Clear CreationTime in case this sector was accepting piece data previously
state.CreationTime = 0
} }
type SectorReplicaUpdate struct { type SectorReplicaUpdate struct {

View File

@ -2,6 +2,7 @@ package sealing
import ( import (
"context" "context"
"go.uber.org/zap"
"sort" "sort"
"time" "time"
@ -91,12 +92,17 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
} }
func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, used abi.UnpaddedPieceSize) (bool, error) { func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, used abi.UnpaddedPieceSize) (bool, error) {
log := log.WithOptions(zap.Fields(
zap.Uint64("sector", uint64(sector.SectorNumber)),
zap.Int("deals", len(sector.dealIDs())),
))
now := time.Now() now := time.Now()
st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)] st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)]
if st != nil { if st != nil {
if !st.Stop() { // timer expired, SectorStartPacking was/is being sent if !st.Stop() { // timer expired, SectorStartPacking was/is being sent
// we send another SectorStartPacking in case one was sent in the handleAddPiece state // we send another SectorStartPacking in case one was sent in the handleAddPiece state
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout") log.Infow("starting to seal deal sector", "trigger", "wait-timeout")
return true, ctx.Send(SectorStartPacking{}) return true, ctx.Send(SectorStartPacking{})
} }
} }
@ -113,13 +119,13 @@ func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo,
if len(sector.dealIDs()) >= maxDeals { if len(sector.dealIDs()) >= maxDeals {
// can't accept more deals // can't accept more deals
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "maxdeals") log.Infow("starting to seal deal sector", "trigger", "maxdeals")
return true, ctx.Send(SectorStartPacking{}) return true, ctx.Send(SectorStartPacking{})
} }
if used.Padded() == abi.PaddedPieceSize(ssize) { if used.Padded() == abi.PaddedPieceSize(ssize) {
// sector full // sector full
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "filled") log.Infow("starting to seal deal sector", "trigger", "filled")
return true, ctx.Send(SectorStartPacking{}) return true, ctx.Send(SectorStartPacking{})
} }
@ -149,15 +155,15 @@ func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo,
} }
if now.After(sealTime) { if now.After(sealTime) {
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout") log.Infow("starting to seal deal sector", "trigger", "wait-timeout", "creation", sector.CreationTime)
return true, ctx.Send(SectorStartPacking{}) return true, ctx.Send(SectorStartPacking{})
} }
m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() { m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() {
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timer") log.Infow("starting to seal deal sector", "trigger", "wait-timer")
if err := ctx.Send(SectorStartPacking{}); err != nil { if err := ctx.Send(SectorStartPacking{}); err != nil {
log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err) log.Errorw("sending SectorStartPacking event failed", "error", err)
} }
}) })
} }