storagefsm: Fix unlocking in handleWaitDeals

This commit is contained in:
Łukasz Magiera 2021-01-21 19:59:18 +01:00
parent 1336d8855d
commit ec4deb7e28

View File

@ -19,70 +19,18 @@ import (
) )
func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error {
m.inputLk.Lock()
now := time.Now()
st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)]
if st != nil {
if !st.Stop() { // timer expired, SectorStartPacking was/is being sent
m.inputLk.Unlock()
// we send another SectorStartPacking in case one was sent in the handleAddPiece state
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout")
return ctx.Send(SectorStartPacking{})
}
}
ssize, err := sector.SectorType.SectorSize()
if err != nil {
return xerrors.Errorf("getting sector size")
}
maxDeals, err := getDealPerSectorLimit(ssize)
if err != nil {
return xerrors.Errorf("getting per-sector deal limit: %w", err)
}
if len(sector.dealIDs()) >= maxDeals {
// can't accept more deals
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "maxdeals")
return ctx.Send(SectorStartPacking{})
}
var used abi.UnpaddedPieceSize var used abi.UnpaddedPieceSize
for _, piece := range sector.Pieces { for _, piece := range sector.Pieces {
used += piece.Piece.Size.Unpadded() used += piece.Piece.Size.Unpadded()
} }
if used.Padded() == abi.PaddedPieceSize(ssize) { m.inputLk.Lock()
// sector full
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "filled")
return ctx.Send(SectorStartPacking{})
}
if sector.CreationTime != 0 { started, err := m.maybeStartSealing(ctx, sector, used)
cfg, err := m.getConfig() if err != nil || started {
if err != nil {
m.inputLk.Unlock() m.inputLk.Unlock()
return xerrors.Errorf("getting storage config: %w", err)
}
// todo check deal age, start sealing if any deal has less than X (configurable) to start deadline return err
sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay)
if now.After(sealTime) {
m.inputLk.Unlock()
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout")
return ctx.Send(SectorStartPacking{})
}
m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() {
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timer")
if err := ctx.Send(SectorStartPacking{}); err != nil {
log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err)
}
})
} }
m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{ m.openSectors[m.minerSectorID(sector.SectorNumber)] = &openSector{
@ -107,6 +55,67 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
return nil return nil
} }
func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, used abi.UnpaddedPieceSize) (bool, error) {
now := time.Now()
st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)]
if st != nil {
if !st.Stop() { // timer expired, SectorStartPacking was/is being sent
// we send another SectorStartPacking in case one was sent in the handleAddPiece state
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout")
return true, ctx.Send(SectorStartPacking{})
}
}
ssize, err := sector.SectorType.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size")
}
maxDeals, err := getDealPerSectorLimit(ssize)
if err != nil {
return false, xerrors.Errorf("getting per-sector deal limit: %w", err)
}
if len(sector.dealIDs()) >= maxDeals {
// can't accept more deals
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "maxdeals")
return true, ctx.Send(SectorStartPacking{})
}
if used.Padded() == abi.PaddedPieceSize(ssize) {
// sector full
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "filled")
return true, ctx.Send(SectorStartPacking{})
}
if sector.CreationTime != 0 {
cfg, err := m.getConfig()
if err != nil {
m.inputLk.Unlock()
return false, xerrors.Errorf("getting storage config: %w", err)
}
// todo check deal age, start sealing if any deal has less than X (configurable) to start deadline
sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay)
if now.After(sealTime) {
m.inputLk.Unlock()
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout")
return true, ctx.Send(SectorStartPacking{})
}
m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() {
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timer")
if err := ctx.Send(SectorStartPacking{}); err != nil {
log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err)
}
})
}
return false, nil
}
func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) error {
ssize, err := sector.SectorType.SectorSize() ssize, err := sector.SectorType.SectorSize()
if err != nil { if err != nil {