improve diff logic

* Make diffing work across versions.
* Start porting more chainwatch logic.
This commit is contained in:
Steven Allen 2020-09-17 21:39:34 -07:00
parent 5bcfee0042
commit b2ee59024f
6 changed files with 364 additions and 287 deletions

View File

@ -0,0 +1,127 @@
package miner
import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/actors/adt"
cbg "github.com/whyrusleeping/cbor-gen"
)
func DiffPreCommits(pre, cur State) (*PreCommitChanges, error) {
results := new(PreCommitChanges)
prep, err := pre.precommits()
if err != nil {
return nil, err
}
curp, err := cur.precommits()
if err != nil {
return nil, err
}
err = adt.DiffAdtMap(prep, curp, &preCommitDiffer{results, pre, cur})
if err != nil {
return nil, err
}
return results, nil
}
type preCommitDiffer struct {
Results *PreCommitChanges
pre, after State
}
func (m *preCommitDiffer) AsKey(key string) (abi.Keyer, error) {
sector, err := abi.ParseUIntKey(key)
if err != nil {
return nil, err
}
return abi.UIntKey(sector), nil
}
func (m *preCommitDiffer) Add(key string, val *cbg.Deferred) error {
sp, err := m.after.decodeSectorPreCommitOnChainInfo(val)
if err != nil {
return err
}
m.Results.Added = append(m.Results.Added, sp)
return nil
}
func (m *preCommitDiffer) Modify(key string, from, to *cbg.Deferred) error {
return nil
}
func (m *preCommitDiffer) Remove(key string, val *cbg.Deferred) error {
sp, err := m.pre.decodeSectorPreCommitOnChainInfo(val)
if err != nil {
return err
}
m.Results.Removed = append(m.Results.Removed, sp)
return nil
}
func DiffSectors(pre, cur State) (*SectorChanges, error) {
results := new(SectorChanges)
pres, err := pre.sectors()
if err != nil {
return nil, err
}
curs, err := cur.sectors()
if err != nil {
return nil, err
}
err = adt.DiffAdtArray(pres, curs, &sectorDiffer{results, pre, cur})
if err != nil {
return nil, err
}
return results, nil
}
type sectorDiffer struct {
Results *SectorChanges
pre, after State
}
func (m *sectorDiffer) Add(key uint64, val *cbg.Deferred) error {
si, err := m.after.decodeSectorOnChainInfo(val)
if err != nil {
return err
}
m.Results.Added = append(m.Results.Added, si)
return nil
}
func (m *sectorDiffer) Modify(key uint64, from, to *cbg.Deferred) error {
siFrom, err := m.pre.decodeSectorOnChainInfo(from)
if err != nil {
return err
}
siTo, err := m.after.decodeSectorOnChainInfo(to)
if err != nil {
return err
}
if siFrom.Expiration != siTo.Expiration {
m.Results.Extended = append(m.Results.Extended, SectorExtensions{
From: siFrom,
To: siTo,
})
}
return nil
}
func (m *sectorDiffer) Remove(key uint64, val *cbg.Deferred) error {
si, err := m.pre.decodeSectorOnChainInfo(val)
if err != nil {
return err
}
m.Results.Removed = append(m.Results.Removed, si)
return nil
}

View File

@ -3,6 +3,7 @@ package miner
import (
"github.com/filecoin-project/go-state-types/dline"
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
@ -42,21 +43,30 @@ type State interface {
GetSectorExpiration(abi.SectorNumber) (*SectorExpiration, error)
GetPrecommittedSector(abi.SectorNumber) (*SectorPreCommitOnChainInfo, error)
LoadSectorsFromSet(filter *bitfield.BitField, filterOut bool) (adt.Array, error)
LoadPreCommittedSectors() (adt.Map, error)
IsAllocated(abi.SectorNumber) (bool, error)
LoadDeadline(idx uint64) (Deadline, error)
ForEachDeadline(cb func(idx uint64, dl Deadline) error) error
NumDeadlines() (uint64, error)
DeadlinesChanged(State) bool
Info() (MinerInfo, error)
DeadlineInfo(epoch abi.ChainEpoch) *dline.Info
// Diff helpers. Used by Diff* functions internally.
sectors() (adt.Array, error)
decodeSectorOnChainInfo(*cbg.Deferred) (SectorOnChainInfo, error)
precommits() (adt.Map, error)
decodeSectorPreCommitOnChainInfo(*cbg.Deferred) (SectorPreCommitOnChainInfo, error)
}
type Deadline interface {
LoadPartition(idx uint64) (Partition, error)
ForEachPartition(cb func(idx uint64, part Partition) error) error
PostSubmissions() (bitfield.BitField, error)
PartitionsChanged(Deadline) bool
}
type Partition interface {
@ -110,3 +120,19 @@ type SectorLocation struct {
Deadline uint64
Partition uint64
}
type SectorChanges struct {
Added []SectorOnChainInfo
Extended []SectorExtensions
Removed []SectorOnChainInfo
}
type SectorExtensions struct {
From SectorOnChainInfo
To SectorOnChainInfo
}
type PreCommitChanges struct {
Added []SectorPreCommitOnChainInfo
Removed []SectorPreCommitOnChainInfo
}

View File

@ -1,6 +1,7 @@
package miner
import (
"bytes"
"errors"
"github.com/filecoin-project/go-address"
@ -206,6 +207,16 @@ func (s *v0State) NumDeadlines() (uint64, error) {
return v0miner.WPoStPeriodDeadlines, nil
}
func (s *v0State) DeadlinesChanged(other State) bool {
v0other, ok := other.(*v0State)
if !ok {
// treat an upgrade as a change, always
return true
}
return s.State.Deadlines.Equals(v0other.Deadlines)
}
func (s *v0State) Info() (MinerInfo, error) {
info, err := s.State.GetInfo(s.store)
if err != nil {
@ -244,6 +255,26 @@ func (s *v0State) DeadlineInfo(epoch abi.ChainEpoch) *dline.Info {
return s.State.DeadlineInfo(epoch)
}
func (s *v0State) sectors() (adt.Array, error) {
return v0adt.AsArray(s.store, s.Sectors)
}
func (s *v0State) decodeSectorOnChainInfo(val *cbg.Deferred) (SectorOnChainInfo, error) {
var si v0miner.SectorOnChainInfo
err := si.UnmarshalCBOR(bytes.NewReader(val.Raw))
return si, err
}
func (s *v0State) precommits() (adt.Map, error) {
return v0adt.AsMap(s.store, s.PreCommittedSectors)
}
func (s *v0State) decodeSectorPreCommitOnChainInfo(val *cbg.Deferred) (SectorPreCommitOnChainInfo, error) {
var sp v0miner.SectorPreCommitOnChainInfo
err := sp.UnmarshalCBOR(bytes.NewReader(val.Raw))
return sp, err
}
func (d *v0Deadline) LoadPartition(idx uint64) (Partition, error) {
p, err := d.Deadline.LoadPartition(d.store, idx)
if err != nil {
@ -263,6 +294,16 @@ func (d *v0Deadline) ForEachPartition(cb func(uint64, Partition) error) error {
})
}
func (s *v0Deadline) PartitionsChanged(other Deadline) bool {
v0other, ok := other.(*v0Deadline)
if !ok {
// treat an upgrade as a change, always
return true
}
return s.Deadline.Partitions.Equals(v0other.Deadline.Partitions)
}
func (d *v0Deadline) PostSubmissions() (bitfield.BitField, error) {
return d.Deadline.PostSubmissions, nil
}

View File

@ -4,8 +4,6 @@ import (
"bytes"
"context"
v0miner "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/go-address"
@ -325,83 +323,12 @@ func (sp *StatePredicates) OnMinerActorChange(minerAddr address.Address, diffMin
})
}
type MinerSectorChanges struct {
Added []miner.SectorOnChainInfo
Extended []SectorExtensions
Removed []miner.SectorOnChainInfo
}
var _ adt.AdtArrayDiff = &MinerSectorChanges{}
type SectorExtensions struct {
From miner.SectorOnChainInfo
To miner.SectorOnChainInfo
}
func (m *MinerSectorChanges) Add(key uint64, val *typegen.Deferred) error {
si := new(miner.SectorOnChainInfo)
err := si.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Added = append(m.Added, *si)
return nil
}
func (m *MinerSectorChanges) Modify(key uint64, from, to *typegen.Deferred) error {
siFrom := new(miner.SectorOnChainInfo)
err := siFrom.UnmarshalCBOR(bytes.NewReader(from.Raw))
if err != nil {
return err
}
siTo := new(miner.SectorOnChainInfo)
err = siTo.UnmarshalCBOR(bytes.NewReader(to.Raw))
if err != nil {
return err
}
if siFrom.Expiration != siTo.Expiration {
m.Extended = append(m.Extended, SectorExtensions{
From: *siFrom,
To: *siTo,
})
}
return nil
}
func (m *MinerSectorChanges) Remove(key uint64, val *typegen.Deferred) error {
si := new(miner.SectorOnChainInfo)
err := si.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Removed = append(m.Removed, *si)
return nil
}
func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc {
return func(ctx context.Context, oldState, newState miner.State) (changed bool, user UserData, err error) {
sectorChanges := &MinerSectorChanges{
Added: []miner.SectorOnChainInfo{},
Extended: []SectorExtensions{},
Removed: []miner.SectorOnChainInfo{},
}
oldSectors, err := oldState.LoadSectorsFromSet(nil, false)
sectorChanges, err := miner.DiffSectors(oldState, newState)
if err != nil {
return false, nil, err
}
newSectors, err := newState.LoadSectorsFromSet(nil, false)
if err != nil {
return false, nil, err
}
if err := adt.DiffAdtArray(oldSectors, newSectors, sectorChanges); err != nil {
return false, nil, err
}
// nothing changed
if len(sectorChanges.Added)+len(sectorChanges.Extended)+len(sectorChanges.Removed) == 0 {
return false, nil, nil
@ -411,64 +338,13 @@ func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc {
}
}
type MinerPreCommitChanges struct {
Added []miner.SectorPreCommitOnChainInfo
Removed []miner.SectorPreCommitOnChainInfo
}
func (m *MinerPreCommitChanges) AsKey(key string) (abi.Keyer, error) {
sector, err := abi.ParseUIntKey(key)
if err != nil {
return nil, err
}
return v0miner.SectorKey(abi.SectorNumber(sector)), nil
}
func (m *MinerPreCommitChanges) Add(key string, val *typegen.Deferred) error {
sp := new(miner.SectorPreCommitOnChainInfo)
err := sp.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Added = append(m.Added, *sp)
return nil
}
func (m *MinerPreCommitChanges) Modify(key string, from, to *typegen.Deferred) error {
return nil
}
func (m *MinerPreCommitChanges) Remove(key string, val *typegen.Deferred) error {
sp := new(miner.SectorPreCommitOnChainInfo)
err := sp.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Removed = append(m.Removed, *sp)
return nil
}
func (sp *StatePredicates) OnMinerPreCommitChange() DiffMinerActorStateFunc {
return func(ctx context.Context, oldState, newState miner.State) (changed bool, user UserData, err error) {
precommitChanges := &MinerPreCommitChanges{
Added: []miner.SectorPreCommitOnChainInfo{},
Removed: []miner.SectorPreCommitOnChainInfo{},
}
oldPrecommits, err := oldState.LoadPreCommittedSectors()
precommitChanges, err := miner.DiffPreCommits(oldState, newState)
if err != nil {
return false, nil, err
}
newPrecommits, err := newState.LoadPreCommittedSectors()
if err != nil {
return false, nil, err
}
if err := adt.DiffAdtMap(oldPrecommits, newPrecommits, precommitChanges); err != nil {
return false, nil, err
}
if len(precommitChanges.Added)+len(precommitChanges.Removed) == 0 {
return false, nil, nil
}

View File

@ -409,7 +409,7 @@ func TestMinerSectorChange(t *testing.T) {
require.True(t, change)
require.NotNil(t, val)
sectorChanges, ok := val.(*MinerSectorChanges)
sectorChanges, ok := val.(*miner.SectorChanges)
require.True(t, ok)
require.Equal(t, len(sectorChanges.Added), 1)
@ -433,7 +433,7 @@ func TestMinerSectorChange(t *testing.T) {
require.True(t, change)
require.NotNil(t, val)
sectorChanges, ok = val.(*MinerSectorChanges)
sectorChanges, ok = val.(*miner.SectorChanges)
require.True(t, ok)
require.Equal(t, 1, len(sectorChanges.Added))

View File

@ -16,12 +16,14 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util"
)
@ -204,6 +206,8 @@ func (p *Processor) processMiners(ctx context.Context, minerTips map[types.TipSe
log.Debugw("Processed Miners", "duration", time.Since(start).String())
}()
stor := store.ActorStore(ctx, apibstore.NewAPIBlockstore(p.node))
var out []minerActorInfo
// TODO add parallel calls if this becomes slow
for tipset, miners := range minerTips {
@ -230,15 +234,13 @@ func (p *Processor) processMiners(ctx context.Context, minerTips map[types.TipSe
mi.rawPower = claim.RawBytePower
}
// Get the miner state info
astb, err := p.node.ChainReadObj(ctx, act.act.Head)
// Get the miner state
mas, err := miner.Load(stor, &act.act)
if err != nil {
log.Warnw("failed to find miner actor state", "address", act.addr, "error", err)
continue
}
if err := mi.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
return nil, err
}
mi.state = mas
out = append(out, mi)
}
}
@ -322,11 +324,6 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA
for _, m := range miners {
m := m
grp.Go(func() error {
minerSectors, err := adt.AsArray(p.ctxStore, m.state.Sectors)
if err != nil {
return err
}
changes, err := p.getMinerPreCommitChanges(ctx, m)
if err != nil {
if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) {
@ -399,10 +396,12 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA
}
var preCommitExpired []uint64
for _, removed := range changes.Removed {
var sector miner.SectorOnChainInfo
if found, err := minerSectors.Get(uint64(removed.Info.SectorNumber), &sector); err != nil {
// TODO: we can optimize this to not load the AMT every time, if necessary.
si, err := m.state.GetSector(removed.Info.SectorNumber)
if err != nil {
return err
} else if !found {
}
if si == nil {
preCommitExpired = append(preCommitExpired, uint64(removed.Info.SectorNumber))
}
}
@ -653,21 +652,12 @@ func (p *Processor) storeMinerSectorEvents(ctx context.Context, sectorEvents, pr
func (p *Processor) getMinerStateAt(ctx context.Context, maddr address.Address, tskey types.TipSetKey) (miner.State, error) {
prevActor, err := p.node.StateGetActor(ctx, maddr, tskey)
if err != nil {
return miner.State{}, err
return nil, err
}
var out miner.State
// Get the miner state info
astb, err := p.node.ChainReadObj(ctx, prevActor.Head)
if err != nil {
return miner.State{}, err
}
if err := out.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
return miner.State{}, err
}
return out, nil
return miner.Load(store.ActorStore(ctx, apibstore.NewAPIBlockstore(p.node)), prevActor)
}
func (p *Processor) getMinerPreCommitChanges(ctx context.Context, m minerActorInfo) (*state.MinerPreCommitChanges, error) {
func (p *Processor) getMinerPreCommitChanges(ctx context.Context, m minerActorInfo) (*miner.PreCommitChanges, error) {
pred := state.NewStatePredicates(p.node)
changed, val, err := pred.OnMinerActorChange(m.common.addr, pred.OnMinerPreCommitChange())(ctx, m.common.parentTsKey, m.common.tsKey)
if err != nil {
@ -676,11 +666,11 @@ func (p *Processor) getMinerPreCommitChanges(ctx context.Context, m minerActorIn
if !changed {
return nil, nil
}
out := val.(*state.MinerPreCommitChanges)
out := val.(*miner.PreCommitChanges)
return out, nil
}
func (p *Processor) getMinerSectorChanges(ctx context.Context, m minerActorInfo) (*state.MinerSectorChanges, error) {
func (p *Processor) getMinerSectorChanges(ctx context.Context, m minerActorInfo) (*miner.SectorChanges, error) {
pred := state.NewStatePredicates(p.node)
changed, val, err := pred.OnMinerActorChange(m.common.addr, pred.OnMinerSectorChange())(ctx, m.common.parentTsKey, m.common.tsKey)
if err != nil {
@ -689,7 +679,7 @@ func (p *Processor) getMinerSectorChanges(ctx context.Context, m minerActorInfo)
if !changed {
return nil, nil
}
out := val.(*state.MinerSectorChanges)
out := val.(*miner.SectorChanges)
return out, nil
}
@ -698,8 +688,20 @@ func (p *Processor) diffMinerPartitions(ctx context.Context, m minerActorInfo, e
if err != nil {
return err
}
dlIdx := prevMiner.CurrentDeadline
curMiner := m.state
if !prevMiner.DeadlinesChanged(curMiner) {
return nil
}
panic("TODO")
// FIXME: This code doesn't work.
// 1. We need to diff all deadlines, not just the "current" deadline.
// 2. We need to handle the case where we _add_ a partition. (i.e.,
// where len(newPartitions) != len(oldPartitions).
/*
// NOTE: If we change the number of deadlines in an upgrade, this will
// break.
// load the old deadline
prevDls, err := prevMiner.LoadDeadlines(p.ctxStore)
@ -803,74 +805,79 @@ func (p *Processor) diffMinerPartitions(ctx context.Context, m minerActorInfo, e
}
return nil
*/
}
func (p *Processor) diffPartition(prevPart, curPart miner.Partition) (*PartitionStatus, error) {
// all the sectors that were in previous but not in current
allRemovedSectors, err := bitfield.SubtractBitField(prevPart.Sectors, curPart.Sectors)
prevLiveSectors, err := prevPart.LiveSectors()
if err != nil {
return nil, err
}
curLiveSectors, err := curPart.LiveSectors()
if err != nil {
return nil, err
}
// list of sectors that were terminated before their expiration.
terminatedEarlyArr, err := adt.AsArray(p.ctxStore, curPart.EarlyTerminated)
removedSectors, err := bitfield.SubtractBitField(prevLiveSectors, curLiveSectors)
if err != nil {
return nil, err
}
expired := bitfield.New()
var bf bitfield.BitField
if err := terminatedEarlyArr.ForEach(&bf, func(i int64) error {
// expired = all removals - termination
expirations, err := bitfield.SubtractBitField(allRemovedSectors, bf)
if err != nil {
return err
}
// merge with expired sectors from other epochs
expired, err = bitfield.MergeBitFields(expirations, expired)
if err != nil {
return nil
}
return nil
}); err != nil {
return nil, err
}
// terminated = all removals - expired
terminated, err := bitfield.SubtractBitField(allRemovedSectors, expired)
prevRecoveries, err := prevPart.RecoveringSectors()
if err != nil {
return nil, err
}
// faults in current but not previous
faults, err := bitfield.SubtractBitField(curPart.Recoveries, prevPart.Recoveries)
curRecoveries, err := curPart.RecoveringSectors()
if err != nil {
return nil, err
}
// recoveries in current but not previous
inRecovery, err := bitfield.SubtractBitField(curPart.Recoveries, prevPart.Recoveries)
newRecoveries, err := bitfield.SubtractBitField(curRecoveries, prevRecoveries)
if err != nil {
return nil, err
}
prevFaults, err := prevPart.FaultySectors()
if err != nil {
return nil, err
}
curFaults, err := curPart.FaultySectors()
if err != nil {
return nil, err
}
newFaults, err := bitfield.SubtractBitField(curFaults, prevFaults)
if err != nil {
return nil, err
}
// all current good sectors
newActiveSectors, err := curPart.ActiveSectors()
curActiveSectors, err := curPart.ActiveSectors()
if err != nil {
return nil, err
}
// sectors that were previously fault and are now currently active are considered recovered.
recovered, err := bitfield.IntersectBitField(prevPart.Faults, newActiveSectors)
recovered, err := bitfield.IntersectBitField(prevFaults, curActiveSectors)
if err != nil {
return nil, err
}
// TODO: distinguish between "terminated" and "expired" sectors. The
// previous code here never had a chance of working in the first place,
// so I'm not going to try to replicate it right now.
//
// How? If the sector expires before it should (according to sector
// info) and it wasn't replaced by a pre-commit deleted in this change
// set, it was "early terminated".
return &PartitionStatus{
Terminated: terminated,
Expired: expired,
Faulted: faults,
InRecovery: inRecovery,
Terminated: bitfield.New(),
Expired: removedSectors,
Faulted: newFaults,
InRecovery: newRecoveries,
Recovered: recovered,
}, nil
}