diff --git a/cmd/lotus-chainwatch/processor/common_actors.go b/cmd/lotus-chainwatch/processor/common_actors.go index 3fbdbf170..5264a48c8 100644 --- a/cmd/lotus-chainwatch/processor/common_actors.go +++ b/cmd/lotus-chainwatch/processor/common_actors.go @@ -1,8 +1,8 @@ package processor import ( + "bytes" "context" - "fmt" "time" "golang.org/x/sync/errgroup" @@ -12,7 +12,12 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/events/state" + "github.com/filecoin-project/lotus/chain/types" + cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" "github.com/filecoin-project/specs-actors/actors/builtin" + _init "github.com/filecoin-project/specs-actors/actors/builtin/init" + "github.com/filecoin-project/specs-actors/actors/util/adt" + typegen "github.com/whyrusleeping/cbor-gen" ) func (p *Processor) setupCommonActors() error { @@ -141,49 +146,41 @@ func (p Processor) storeActorAddresses(ctx context.Context, actors map[cid.Cid]A addressToID[builtin.StorageMarketActorAddr] = builtin.StorageMarketActorAddr addressToID[builtin.VerifiedRegistryActorAddr] = builtin.VerifiedRegistryActorAddr addressToID[builtin.BurntFundsActorAddr] = builtin.BurntFundsActorAddr - - addressesToUpdate := []UpdateAddresses{} - - pred := state.NewStatePredicates(p.node) - for _, act := range actors[builtin.InitActorCodeID] { - for _, info := range act { - changed, val, err := pred.OnInitActorChange(pred.OnAddressMapChange())(ctx, info.parentTsKey, info.tsKey) - if err != nil { - return err - } - if !changed { - continue - } - changes := val.(*state.InitActorAddressChanges) - for _, add := range changes.Added { - addressToID[add.PK] = add.ID - } - // we'll need to update any addresses that were modified, this indicates a reorg. - for _, mod := range changes.Modified { - addressesToUpdate = append(addressesToUpdate, UpdateAddresses{ - Old: mod.From, - New: mod.To, - }) - } - } - } - - updateTx, err := p.db.Begin() + initActor, err := p.node.StateGetActor(ctx, builtin.InitActorAddr, types.EmptyTSK) if err != nil { return err } - for _, updates := range addressesToUpdate { - if _, err := updateTx.Exec( - fmt.Sprintf("update id_address_map set id=%s, address=%s where id=%s and address=%s", updates.New.ID, updates.New.PK, updates.Old.ID, updates.Old.PK), - ); err != nil { - return err - } - } - if err := updateTx.Commit(); err != nil { + initActorRaw, err := p.node.ChainReadObj(ctx, initActor.Head) + if err != nil { return err } + var initActorState _init.State + if err := initActorState.UnmarshalCBOR(bytes.NewReader(initActorRaw)); err != nil { + return err + } + ctxStore := cw_util.NewAPIIpldStore(ctx, p.node) + addrMap, err := adt.AsMap(ctxStore, initActorState.AddressMap) + if err != nil { + return err + } + // gross.. + var actorID typegen.CborInt + if err := addrMap.ForEach(&actorID, func(key string) error { + longAddr, err := address.NewFromBytes([]byte(key)) + if err != nil { + return err + } + shortAddr, err := address.NewIDAddress(uint64(actorID)) + if err != nil { + return err + } + addressToID[longAddr] = shortAddr + return nil + }); err != nil { + return err + } tx, err := p.db.Begin() if err != nil { return err @@ -215,8 +212,10 @@ create temp table iam (like id_address_map excluding constraints) on commit drop return err } - if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict (id) do nothing`); err != nil { - return xerrors.Errorf("actor put: %w", err) + // HACK until chain watch can handle reorgs we need to update this table when ID -> PubKey mappings change + if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict (id) do update set address = EXCLUDED.address`); err != nil { + log.Warnw("Failed to update id_address_map table, this is a known issue") + return nil } return tx.Commit()