Sector termination support - address review

This commit is contained in:
Łukasz Magiera 2021-01-14 15:46:57 +01:00
parent 18d38ca42f
commit 49abdd7d7d
4 changed files with 142 additions and 97 deletions

View File

@ -20,6 +20,7 @@ import (
builtin0 "github.com/filecoin-project/specs-actors/actors/builtin" builtin0 "github.com/filecoin-project/specs-actors/actors/builtin"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner" miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin" builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
) )
func init() { func init() {
@ -42,6 +43,9 @@ var FaultDeclarationCutoff = miner0.FaultDeclarationCutoff
const MinSectorExpiration = miner0.MinSectorExpiration const MinSectorExpiration = miner0.MinSectorExpiration
// Not used / checked in v0
var DeclarationsMax = miner2.DeclarationsMax
func Load(store adt.Store, act *types.Actor) (st State, err error) { func Load(store adt.Store, act *types.Actor) (st State, err error) {
switch act.Code { switch act.Code {
case builtin0.StorageMinerActorCodeID: case builtin0.StorageMinerActorCodeID:

View File

@ -309,8 +309,16 @@ func (m *Sealing) handleRemoveFailed(ctx statemachine.Context, sector SectorInfo
return ctx.Send(SectorRemove{}) return ctx.Send(SectorRemove{})
} }
func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, si SectorInfo) error {
if err := failedCooldown(ctx, sector); err != nil { // ignoring error as it's most likely an API error - `pci` will be nil, and we'll go back to
// the Terminating state after cooldown. If the API is still failing, well get back to here
// with the error in SectorInfo log.
pci, _ := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, si.SectorNumber, nil)
if pci != nil {
return nil // pause the fsm, needs manual user action
}
if err := failedCooldown(ctx, si); err != nil {
return err return err
} }

View File

@ -53,7 +53,15 @@ func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo)
if si == nil { if si == nil {
// either already terminated or not committed yet // either already terminated or not committed yet
// todo / edge case - may be in process of being committed, but let's call that really unlikely
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, si.SectorNumber, nil)
if err != nil {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("checking precommit presence: %w", err)})
}
if pci != nil {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("sector was precommitted but not proven, remove instead of terminating")})
}
return ctx.Send(SectorRemove{}) return ctx.Send(SectorRemove{})
} }

View File

@ -98,105 +98,130 @@ func (b *TerminateBatcher) run() {
forceRes = fr forceRes = fr
} }
dl, err := b.api.StateMinerProvingDeadline(b.mctx, b.maddr, nil) var err error
lastMsg, err = b.processBatch(notif, after)
if err != nil { if err != nil {
log.Errorw("TerminateBatcher: getting proving deadline info failed", "error", err) log.Warnw("TerminateBatcher processBatch error", "error", err)
continue
} }
b.lk.Lock()
params := miner2.TerminateSectorsParams{}
var total uint64
for loc, sectors := range b.todo {
n, err := sectors.Count()
if err != nil {
log.Errorw("TerminateBatcher: failed to count sectors to terminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err)
}
// don't send terminations for currently challenged sectors
if loc.Deadline == dl.Index || (loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index {
continue
}
if n < 1 {
log.Warnw("TerminateBatcher: zero sectors in bucket", "deadline", loc.Deadline, "partition", loc.Partition)
continue
}
total += n
params.Terminations = append(params.Terminations, miner2.TerminationDeclaration{
Deadline: loc.Deadline,
Partition: loc.Partition,
Sectors: *sectors,
})
}
if len(params.Terminations) == 0 {
b.lk.Unlock()
continue // nothing to do
}
if notif && total < TerminateBatchMax {
b.lk.Unlock()
continue
}
if after && total < TerminateBatchMin {
b.lk.Unlock()
continue
}
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
log.Warnw("TerminateBatcher: couldn't serialize TerminateSectors params", "error", err)
b.lk.Unlock()
continue
}
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
if err != nil {
log.Warnw("TerminateBatcher: couldn't get miner info", "error", err)
b.lk.Unlock()
continue
}
from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, b.feeCfg.MaxTerminateGasFee, b.feeCfg.MaxTerminateGasFee)
if err != nil {
log.Warnw("TerminateBatcher: no good address found", "error", err)
b.lk.Unlock()
continue
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.TerminateSectors, big.Zero(), b.feeCfg.MaxTerminateGasFee, enc.Bytes())
if err != nil {
log.Errorw("TerminateBatcher: sending message failed", "error", err)
b.lk.Unlock()
continue
}
lastMsg = &mcid
log.Infow("Sent TerminateSectors message", "cid", mcid, "from", from, "terminations", len(params.Terminations))
for _, t := range params.Terminations {
delete(b.todo, SectorLocation{
Deadline: t.Deadline,
Partition: t.Partition,
})
}
for _, w := range b.waiting {
for _, ch := range w {
ch <- mcid // buffered
}
}
b.waiting = map[SectorLocation][]chan cid.Cid{}
b.lk.Unlock()
} }
} }
func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
dl, err := b.api.StateMinerProvingDeadline(b.mctx, b.maddr, nil)
if err != nil {
return nil, xerrors.Errorf("getting proving deadline info failed: %w", err)
}
b.lk.Lock()
defer b.lk.Unlock()
params := miner2.TerminateSectorsParams{}
var total uint64
for loc, sectors := range b.todo {
n, err := sectors.Count()
if err != nil {
log.Errorw("TerminateBatcher: failed to count sectors to terminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err)
continue
}
// don't send terminations for currently challenged sectors
if loc.Deadline == (dl.Index+1)%miner.WPoStPeriodDeadlines || // not in next (in case the terminate message takes a while to get on chain)
loc.Deadline == dl.Index || // not in current
(loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index { // not in previous
continue
}
if n < 1 {
log.Warnw("TerminateBatcher: zero sectors in bucket", "deadline", loc.Deadline, "partition", loc.Partition)
continue
}
toTerminate, err := sectors.Copy()
if err != nil {
log.Warnw("TerminateBatcher: copy sectors bitfield", "deadline", loc.Deadline, "partition", loc.Partition, "error", err)
continue
}
if total+n > uint64(miner.DeclarationsMax) {
n = uint64(miner.DeclarationsMax) - total
toTerminate, err = toTerminate.Slice(0, n)
if err != nil {
log.Warnw("TerminateBatcher: slice toTerminate bitfield", "deadline", loc.Deadline, "partition", loc.Partition, "error", err)
continue
}
*sectors, err = bitfield.SubtractBitField(*sectors, toTerminate)
if err != nil {
log.Warnw("TerminateBatcher: sectors-toTerminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err)
continue
}
}
total += n
params.Terminations = append(params.Terminations, miner2.TerminationDeclaration{
Deadline: loc.Deadline,
Partition: loc.Partition,
Sectors: toTerminate,
})
if total >= uint64(miner.DeclarationsMax) {
break
}
}
if len(params.Terminations) == 0 {
return nil, nil // nothing to do
}
if notif && total < TerminateBatchMax {
return nil, nil
}
if after && total < TerminateBatchMin {
return nil, nil
}
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
return nil, xerrors.Errorf("couldn't serialize TerminateSectors params: %w", err)
}
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
if err != nil {
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
}
from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, b.feeCfg.MaxTerminateGasFee, b.feeCfg.MaxTerminateGasFee)
if err != nil {
return nil, xerrors.Errorf("no good address found: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.TerminateSectors, big.Zero(), b.feeCfg.MaxTerminateGasFee, enc.Bytes())
if err != nil {
return nil, xerrors.Errorf("sending message failed: %w", err)
}
log.Infow("Sent TerminateSectors message", "cid", mcid, "from", from, "terminations", len(params.Terminations))
for _, t := range params.Terminations {
delete(b.todo, SectorLocation{
Deadline: t.Deadline,
Partition: t.Partition,
})
}
for _, w := range b.waiting {
for _, ch := range w {
ch <- mcid // buffered
}
}
b.waiting = map[SectorLocation][]chan cid.Cid{}
return &mcid, nil
}
// register termination, wait for batch message, return message CID // register termination, wait for batch message, return message CID
func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) (cid.Cid, error) { func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) (cid.Cid, error) {
maddr, err := address.NewIDAddress(uint64(s.Miner)) maddr, err := address.NewIDAddress(uint64(s.Miner))