bug: fix id_address_map table conflicts on reorg

This commit is contained in:
frrist 2020-08-12 16:50:00 -07:00
parent ce21557d64
commit 69a472c4ea
2 changed files with 44 additions and 49 deletions

View File

@ -666,7 +666,6 @@ type DiffInitActorStateFunc func(ctx context.Context, oldState *init_.State, new
func (i *InitActorAddressChanges) AsKey(key string) (adt.Keyer, error) {
addr , err := address.NewFromBytes([]byte(key))
if err != nil {
panic(err)
return nil, err
}
return adt.AddrKey(addr), nil
@ -675,17 +674,14 @@ func (i *InitActorAddressChanges) AsKey(key string) (adt.Keyer, error) {
func (i *InitActorAddressChanges) Add(key string, val *typegen.Deferred) error {
pkAddr, err := address.NewFromBytes([]byte(key))
if err != nil {
panic(err)
return err
}
id := new(typegen.CborInt)
if err := id.UnmarshalCBOR(bytes.NewReader(val.Raw)); err != nil {
panic(err)
return err
}
idAddr, err := address.NewIDAddress(uint64(*id))
if err != nil {
panic(err)
return err
}
i.Added = append(i.Added, AddressPair{
@ -698,29 +694,24 @@ func (i *InitActorAddressChanges) Add(key string, val *typegen.Deferred) error {
func (i *InitActorAddressChanges) Modify(key string, from, to *typegen.Deferred) error {
pkAddr, err := address.NewFromBytes([]byte(key))
if err != nil {
panic(err)
return err
}
fromID := new(typegen.CborInt)
if err := fromID.UnmarshalCBOR(bytes.NewReader(from.Raw)); err != nil {
panic(err)
return err
}
fromIDAddr, err := address.NewIDAddress(uint64(*fromID))
if err != nil {
panic(err)
return err
}
toID := new(typegen.CborInt)
if err := toID.UnmarshalCBOR(bytes.NewReader(to.Raw)); err != nil {
panic(err)
return err
}
toIDAddr, err := address.NewIDAddress(uint64(*toID))
if err != nil {
panic(err)
return err
}
@ -740,17 +731,14 @@ func (i *InitActorAddressChanges) Modify(key string, from, to *typegen.Deferred)
func (i *InitActorAddressChanges) Remove(key string, val *typegen.Deferred) error {
pkAddr, err := address.NewFromBytes([]byte(key))
if err != nil {
panic(err)
return err
}
id := new(typegen.CborInt)
if err := id.UnmarshalCBOR(bytes.NewReader(val.Raw)); err != nil {
panic(err)
return err
}
idAddr, err := address.NewIDAddress(uint64(*id))
if err != nil {
panic(err)
return err
}
i.Removed = append(i.Removed, AddressPair{

View File

@ -1,23 +1,18 @@
package processor
import (
"bytes"
"context"
"fmt"
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
typegen "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/specs-actors/actors/builtin"
_init "github.com/filecoin-project/specs-actors/actors/builtin/init"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/lotus/chain/types"
cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util"
)
func (p *Processor) setupCommonActors() error {
@ -125,6 +120,11 @@ func (p *Processor) HandleCommonActorsChanges(ctx context.Context, actors map[ci
return grp.Wait()
}
type UpdateAddresses struct {
Old state.AddressPair
New state.AddressPair
}
func (p Processor) storeActorAddresses(ctx context.Context, actors map[cid.Cid]ActorTips) error {
start := time.Now()
defer func() {
@ -141,41 +141,49 @@ func (p Processor) storeActorAddresses(ctx context.Context, actors map[cid.Cid]A
addressToID[builtin.StorageMarketActorAddr] = builtin.StorageMarketActorAddr
addressToID[builtin.VerifiedRegistryActorAddr] = builtin.VerifiedRegistryActorAddr
addressToID[builtin.BurntFundsActorAddr] = builtin.BurntFundsActorAddr
initActor, err := p.node.StateGetActor(ctx, builtin.InitActorAddr, types.EmptyTSK)
addressesToUpdate := []UpdateAddresses{}
pred := state.NewStatePredicates(p.node)
for _, act := range actors[builtin.InitActorCodeID] {
for _, info := range act {
changed, val, err := pred.OnInitActorChange(pred.OnAddressMapChange())(ctx, info.parentTsKey, info.tsKey)
if err != nil {
return err
}
if !changed {
continue
}
changes := val.(*state.InitActorAddressChanges)
for _, add := range changes.Added {
addressToID[add.PK] = add.ID
}
// we'll need to update any addresses that were modified, this indicates a reorg.
for _, mod := range changes.Modified {
addressesToUpdate = append(addressesToUpdate, UpdateAddresses{
Old: mod.From,
New: mod.To,
})
}
}
}
updateTx, err := p.db.Begin()
if err != nil {
return err
}
initActorRaw, err := p.node.ChainReadObj(ctx, initActor.Head)
if err != nil {
for _, updates := range addressesToUpdate {
if _, err := updateTx.Exec(
fmt.Sprintf("update id_address_map set id=%s, address=%s where id=%s and address=%s", updates.New.ID, updates.New.PK, updates.Old.ID, updates.Old.PK),
); err != nil {
return err
}
}
if err := updateTx.Commit(); err != nil {
return err
}
var initActorState _init.State
if err := initActorState.UnmarshalCBOR(bytes.NewReader(initActorRaw)); err != nil {
return err
}
ctxStore := cw_util.NewAPIIpldStore(ctx, p.node)
addrMap, err := adt.AsMap(ctxStore, initActorState.AddressMap)
if err != nil {
return err
}
// gross..
var actorID typegen.CborInt
if err := addrMap.ForEach(&actorID, func(key string) error {
longAddr, err := address.NewFromBytes([]byte(key))
if err != nil {
return err
}
shortAddr, err := address.NewIDAddress(uint64(actorID))
if err != nil {
return err
}
addressToID[longAddr] = shortAddr
return nil
}); err != nil {
return err
}
tx, err := p.db.Begin()
if err != nil {
return err
@ -207,8 +215,7 @@ create temp table iam (like id_address_map excluding constraints) on commit drop
return err
}
// HACK until chain watch can handle reorgs we need to update this table when ID -> PubKey mappings change
if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict (id) do update set address = EXCLUDED.address`); err != nil {
if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict (id) do nothing`); err != nil {
return xerrors.Errorf("actor put: %w", err)
}