Merge pull request #9642 from filecoin-project/fix/snapsector-clear-ctime
fix: sealing pipeline: Clear CreationTime when starting sector upgrade
This commit is contained in:
commit
51e92c97c3
@ -323,6 +323,9 @@ func (evt SectorStartCCUpdate) apply(state *SectorInfo) {
|
|||||||
// Clear filler piece but remember in case of abort
|
// Clear filler piece but remember in case of abort
|
||||||
state.CCPieces = state.Pieces
|
state.CCPieces = state.Pieces
|
||||||
state.Pieces = nil
|
state.Pieces = nil
|
||||||
|
|
||||||
|
// Clear CreationTime in case this sector was accepting piece data previously
|
||||||
|
state.CreationTime = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
type SectorReplicaUpdate struct {
|
type SectorReplicaUpdate struct {
|
||||||
@ -458,6 +461,7 @@ func (evt SectorRevertUpgradeToProving) apply(state *SectorInfo) {
|
|||||||
state.ReplicaUpdateMessage = nil
|
state.ReplicaUpdateMessage = nil
|
||||||
state.Pieces = state.CCPieces
|
state.Pieces = state.CCPieces
|
||||||
state.CCPieces = nil
|
state.CCPieces = nil
|
||||||
|
state.CreationTime = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
type SectorRetrySubmitReplicaUpdateWait struct{}
|
type SectorRetrySubmitReplicaUpdateWait struct{}
|
||||||
|
@ -390,3 +390,64 @@ func TestTicketExpired(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCreationTimeCleared(t *testing.T) {
|
||||||
|
var notif []struct{ before, after SectorInfo }
|
||||||
|
ma, _ := address.NewIDAddress(55151)
|
||||||
|
m := test{
|
||||||
|
s: &Sealing{
|
||||||
|
maddr: ma,
|
||||||
|
stats: SectorStats{
|
||||||
|
bySector: map[abi.SectorID]SectorState{},
|
||||||
|
byState: map[SectorState]int64{},
|
||||||
|
},
|
||||||
|
notifee: func(before, after SectorInfo) {
|
||||||
|
notif = append(notif, struct{ before, after SectorInfo }{before, after})
|
||||||
|
},
|
||||||
|
},
|
||||||
|
t: t,
|
||||||
|
state: &SectorInfo{State: Available},
|
||||||
|
}
|
||||||
|
|
||||||
|
// sector starts with zero CreationTime
|
||||||
|
m.planSingle(SectorStartCCUpdate{})
|
||||||
|
require.Equal(m.t, m.state.State, SnapDealsWaitDeals)
|
||||||
|
|
||||||
|
require.Equal(t, int64(0), m.state.CreationTime)
|
||||||
|
|
||||||
|
// First AddPiece will set CreationTime
|
||||||
|
m.planSingle(SectorAddPiece{})
|
||||||
|
require.Equal(m.t, m.state.State, SnapDealsAddPiece)
|
||||||
|
|
||||||
|
require.NotEqual(t, int64(0), m.state.CreationTime)
|
||||||
|
|
||||||
|
m.planSingle(SectorPieceAdded{})
|
||||||
|
require.Equal(m.t, m.state.State, SnapDealsWaitDeals)
|
||||||
|
|
||||||
|
// abort shoult clean up CreationTime
|
||||||
|
m.planSingle(SectorAbortUpgrade{})
|
||||||
|
require.Equal(m.t, m.state.State, AbortUpgrade)
|
||||||
|
|
||||||
|
require.NotEqual(t, int64(0), m.state.CreationTime)
|
||||||
|
|
||||||
|
m.planSingle(SectorRevertUpgradeToProving{})
|
||||||
|
require.Equal(m.t, m.state.State, Proving)
|
||||||
|
|
||||||
|
require.Equal(t, int64(0), m.state.CreationTime)
|
||||||
|
|
||||||
|
m.planSingle(SectorMarkForUpdate{})
|
||||||
|
|
||||||
|
// in case CreationTime was set for whatever reason (lotus bug / manual sector state change)
|
||||||
|
// make sure we clean it up when starting upgrade
|
||||||
|
m.state.CreationTime = 325
|
||||||
|
m.planSingle(SectorStartCCUpdate{})
|
||||||
|
require.Equal(m.t, m.state.State, SnapDealsWaitDeals)
|
||||||
|
|
||||||
|
require.Equal(t, int64(0), m.state.CreationTime)
|
||||||
|
|
||||||
|
// "First" AddPiece will set CreationTime
|
||||||
|
m.planSingle(SectorAddPiece{})
|
||||||
|
require.Equal(m.t, m.state.State, SnapDealsAddPiece)
|
||||||
|
|
||||||
|
require.NotEqual(t, int64(0), m.state.CreationTime)
|
||||||
|
}
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
"go.uber.org/zap"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-commp-utils/zerocomm"
|
"github.com/filecoin-project/go-commp-utils/zerocomm"
|
||||||
@ -91,12 +92,17 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, used abi.UnpaddedPieceSize) (bool, error) {
|
func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, used abi.UnpaddedPieceSize) (bool, error) {
|
||||||
|
log := log.WithOptions(zap.Fields(
|
||||||
|
zap.Uint64("sector", uint64(sector.SectorNumber)),
|
||||||
|
zap.Int("deals", len(sector.dealIDs())),
|
||||||
|
))
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)]
|
st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)]
|
||||||
if st != nil {
|
if st != nil {
|
||||||
if !st.Stop() { // timer expired, SectorStartPacking was/is being sent
|
if !st.Stop() { // timer expired, SectorStartPacking was/is being sent
|
||||||
// we send another SectorStartPacking in case one was sent in the handleAddPiece state
|
// we send another SectorStartPacking in case one was sent in the handleAddPiece state
|
||||||
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout")
|
log.Infow("starting to seal deal sector", "trigger", "wait-timeout")
|
||||||
return true, ctx.Send(SectorStartPacking{})
|
return true, ctx.Send(SectorStartPacking{})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -113,13 +119,13 @@ func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo,
|
|||||||
|
|
||||||
if len(sector.dealIDs()) >= maxDeals {
|
if len(sector.dealIDs()) >= maxDeals {
|
||||||
// can't accept more deals
|
// can't accept more deals
|
||||||
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "maxdeals")
|
log.Infow("starting to seal deal sector", "trigger", "maxdeals")
|
||||||
return true, ctx.Send(SectorStartPacking{})
|
return true, ctx.Send(SectorStartPacking{})
|
||||||
}
|
}
|
||||||
|
|
||||||
if used.Padded() == abi.PaddedPieceSize(ssize) {
|
if used.Padded() == abi.PaddedPieceSize(ssize) {
|
||||||
// sector full
|
// sector full
|
||||||
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "filled")
|
log.Infow("starting to seal deal sector", "trigger", "filled")
|
||||||
return true, ctx.Send(SectorStartPacking{})
|
return true, ctx.Send(SectorStartPacking{})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -149,15 +155,15 @@ func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if now.After(sealTime) {
|
if now.After(sealTime) {
|
||||||
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout")
|
log.Infow("starting to seal deal sector", "trigger", "wait-timeout", "creation", sector.CreationTime)
|
||||||
return true, ctx.Send(SectorStartPacking{})
|
return true, ctx.Send(SectorStartPacking{})
|
||||||
}
|
}
|
||||||
|
|
||||||
m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() {
|
m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() {
|
||||||
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timer")
|
log.Infow("starting to seal deal sector", "trigger", "wait-timer")
|
||||||
|
|
||||||
if err := ctx.Send(SectorStartPacking{}); err != nil {
|
if err := ctx.Send(SectorStartPacking{}); err != nil {
|
||||||
log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err)
|
log.Errorw("sending SectorStartPacking event failed", "error", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user