diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index f08e2b2b4..544ff7b14 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -666,7 +666,6 @@ type DiffInitActorStateFunc func(ctx context.Context, oldState *init_.State, new func (i *InitActorAddressChanges) AsKey(key string) (adt.Keyer, error) { addr , err := address.NewFromBytes([]byte(key)) if err != nil { - panic(err) return nil, err } return adt.AddrKey(addr), nil @@ -675,17 +674,14 @@ func (i *InitActorAddressChanges) AsKey(key string) (adt.Keyer, error) { func (i *InitActorAddressChanges) Add(key string, val *typegen.Deferred) error { pkAddr, err := address.NewFromBytes([]byte(key)) if err != nil { - panic(err) return err } id := new(typegen.CborInt) if err := id.UnmarshalCBOR(bytes.NewReader(val.Raw)); err != nil { - panic(err) return err } idAddr, err := address.NewIDAddress(uint64(*id)) if err != nil { - panic(err) return err } i.Added = append(i.Added, AddressPair{ @@ -698,29 +694,24 @@ func (i *InitActorAddressChanges) Add(key string, val *typegen.Deferred) error { func (i *InitActorAddressChanges) Modify(key string, from, to *typegen.Deferred) error { pkAddr, err := address.NewFromBytes([]byte(key)) if err != nil { - panic(err) return err } fromID := new(typegen.CborInt) if err := fromID.UnmarshalCBOR(bytes.NewReader(from.Raw)); err != nil { - panic(err) return err } fromIDAddr, err := address.NewIDAddress(uint64(*fromID)) if err != nil { - panic(err) return err } toID := new(typegen.CborInt) if err := toID.UnmarshalCBOR(bytes.NewReader(to.Raw)); err != nil { - panic(err) return err } toIDAddr, err := address.NewIDAddress(uint64(*toID)) if err != nil { - panic(err) return err } @@ -740,17 +731,14 @@ func (i *InitActorAddressChanges) Modify(key string, from, to *typegen.Deferred) func (i *InitActorAddressChanges) Remove(key string, val *typegen.Deferred) error { pkAddr, err := address.NewFromBytes([]byte(key)) if err != nil { - panic(err) return err } id := new(typegen.CborInt) if err := id.UnmarshalCBOR(bytes.NewReader(val.Raw)); err != nil { - panic(err) return err } idAddr, err := address.NewIDAddress(uint64(*id)) if err != nil { - panic(err) return err } i.Removed = append(i.Removed, AddressPair{ diff --git a/cmd/lotus-chainwatch/processor/common_actors.go b/cmd/lotus-chainwatch/processor/common_actors.go index 3baad612d..b2e86ddc2 100644 --- a/cmd/lotus-chainwatch/processor/common_actors.go +++ b/cmd/lotus-chainwatch/processor/common_actors.go @@ -1,23 +1,18 @@ package processor import ( - "bytes" "context" + "fmt" "time" "golang.org/x/sync/errgroup" "golang.org/x/xerrors" "github.com/ipfs/go-cid" - typegen "github.com/whyrusleeping/cbor-gen" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/specs-actors/actors/builtin" - _init "github.com/filecoin-project/specs-actors/actors/builtin/init" - "github.com/filecoin-project/specs-actors/actors/util/adt" - - "github.com/filecoin-project/lotus/chain/types" - cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" ) func (p *Processor) setupCommonActors() error { @@ -125,6 +120,11 @@ func (p *Processor) HandleCommonActorsChanges(ctx context.Context, actors map[ci return grp.Wait() } +type UpdateAddresses struct { + Old state.AddressPair + New state.AddressPair +} + func (p Processor) storeActorAddresses(ctx context.Context, actors map[cid.Cid]ActorTips) error { start := time.Now() defer func() { @@ -141,41 +141,49 @@ func (p Processor) storeActorAddresses(ctx context.Context, actors map[cid.Cid]A addressToID[builtin.StorageMarketActorAddr] = builtin.StorageMarketActorAddr addressToID[builtin.VerifiedRegistryActorAddr] = builtin.VerifiedRegistryActorAddr addressToID[builtin.BurntFundsActorAddr] = builtin.BurntFundsActorAddr - initActor, err := p.node.StateGetActor(ctx, builtin.InitActorAddr, types.EmptyTSK) + + addressesToUpdate := []UpdateAddresses{} + + pred := state.NewStatePredicates(p.node) + for _, act := range actors[builtin.InitActorCodeID] { + for _, info := range act { + changed, val, err := pred.OnInitActorChange(pred.OnAddressMapChange())(ctx, info.parentTsKey, info.tsKey) + if err != nil { + return err + } + if !changed { + continue + } + changes := val.(*state.InitActorAddressChanges) + for _, add := range changes.Added { + addressToID[add.PK] = add.ID + } + // we'll need to update any addresses that were modified, this indicates a reorg. + for _, mod := range changes.Modified { + addressesToUpdate = append(addressesToUpdate, UpdateAddresses{ + Old: mod.From, + New: mod.To, + }) + } + } + } + + updateTx, err := p.db.Begin() if err != nil { return err } - initActorRaw, err := p.node.ChainReadObj(ctx, initActor.Head) - if err != nil { + for _, updates := range addressesToUpdate { + if _, err := updateTx.Exec( + fmt.Sprintf("update id_address_map set id=%s, address=%s where id=%s and address=%s", updates.New.ID, updates.New.PK, updates.Old.ID, updates.Old.PK), + ); err != nil { + return err + } + } + if err := updateTx.Commit(); err != nil { return err } - var initActorState _init.State - if err := initActorState.UnmarshalCBOR(bytes.NewReader(initActorRaw)); err != nil { - return err - } - ctxStore := cw_util.NewAPIIpldStore(ctx, p.node) - addrMap, err := adt.AsMap(ctxStore, initActorState.AddressMap) - if err != nil { - return err - } - // gross.. - var actorID typegen.CborInt - if err := addrMap.ForEach(&actorID, func(key string) error { - longAddr, err := address.NewFromBytes([]byte(key)) - if err != nil { - return err - } - shortAddr, err := address.NewIDAddress(uint64(actorID)) - if err != nil { - return err - } - addressToID[longAddr] = shortAddr - return nil - }); err != nil { - return err - } tx, err := p.db.Begin() if err != nil { return err @@ -207,8 +215,7 @@ create temp table iam (like id_address_map excluding constraints) on commit drop return err } - // HACK until chain watch can handle reorgs we need to update this table when ID -> PubKey mappings change - if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict (id) do update set address = EXCLUDED.address`); err != nil { + if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict (id) do nothing`); err != nil { return xerrors.Errorf("actor put: %w", err) }