315 lines
7.8 KiB
Go
315 lines
7.8 KiB
Go
package processor
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"time"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
"github.com/filecoin-project/lotus/chain/events/state"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util"
|
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
|
_init "github.com/filecoin-project/specs-actors/actors/builtin/init"
|
|
"github.com/filecoin-project/specs-actors/actors/util/adt"
|
|
"github.com/ipfs/go-cid"
|
|
"github.com/multiformats/go-multihash"
|
|
typegen "github.com/whyrusleeping/cbor-gen"
|
|
)
|
|
|
|
func (p *Processor) setupCommonActors() error {
|
|
tx, err := p.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.Exec(`
|
|
create table if not exists id_address_map
|
|
(
|
|
id text not null,
|
|
address text not null,
|
|
constraint id_address_map_pk
|
|
primary key (id, address)
|
|
);
|
|
|
|
create unique index if not exists id_address_map_id_uindex
|
|
on id_address_map (id);
|
|
|
|
create unique index if not exists id_address_map_address_uindex
|
|
on id_address_map (address);
|
|
|
|
create table if not exists actors
|
|
(
|
|
id text not null
|
|
constraint id_address_map_actors_id_fk
|
|
references id_address_map (id),
|
|
code text not null,
|
|
head text not null,
|
|
nonce int not null,
|
|
balance text not null,
|
|
stateroot text
|
|
);
|
|
|
|
create index if not exists actors_id_index
|
|
on actors (id);
|
|
|
|
create index if not exists id_address_map_address_index
|
|
on id_address_map (address);
|
|
|
|
create index if not exists id_address_map_id_index
|
|
on id_address_map (id);
|
|
|
|
create or replace function actor_tips(epoch bigint)
|
|
returns table (id text,
|
|
code text,
|
|
head text,
|
|
nonce int,
|
|
balance text,
|
|
stateroot text,
|
|
height bigint,
|
|
parentstateroot text) as
|
|
$body$
|
|
select distinct on (id) * from actors
|
|
inner join state_heights sh on sh.parentstateroot = stateroot
|
|
where height < $1
|
|
order by id, height desc;
|
|
$body$ language sql;
|
|
|
|
create table if not exists actor_states
|
|
(
|
|
head text not null,
|
|
code text not null,
|
|
state json not null
|
|
);
|
|
|
|
create unique index if not exists actor_states_head_code_uindex
|
|
on actor_states (head, code);
|
|
|
|
create index if not exists actor_states_head_index
|
|
on actor_states (head);
|
|
|
|
create index if not exists actor_states_code_head_index
|
|
on actor_states (head, code);
|
|
|
|
`); err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (p *Processor) HandleCommonActorsChanges(ctx context.Context, actors map[cid.Cid]ActorTips) error {
|
|
if err := p.storeActorAddresses(ctx, actors); err != nil {
|
|
return err
|
|
}
|
|
|
|
grp, _ := errgroup.WithContext(ctx)
|
|
|
|
grp.Go(func() error {
|
|
if err := p.storeActorHeads(actors); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
grp.Go(func() error {
|
|
if err := p.storeActorStates(actors); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return grp.Wait()
|
|
}
|
|
|
|
type UpdateAddresses struct {
|
|
Old state.AddressPair
|
|
New state.AddressPair
|
|
}
|
|
|
|
func (p Processor) storeActorAddresses(ctx context.Context, actors map[cid.Cid]ActorTips) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
log.Debugw("Stored Actor Addresses", "duration", time.Since(start).String())
|
|
}()
|
|
|
|
addressToID := map[address.Address]address.Address{}
|
|
// HACK until genesis storage is figured out:
|
|
addressToID[builtin.SystemActorAddr] = builtin.SystemActorAddr
|
|
addressToID[builtin.InitActorAddr] = builtin.InitActorAddr
|
|
addressToID[builtin.RewardActorAddr] = builtin.RewardActorAddr
|
|
addressToID[builtin.CronActorAddr] = builtin.CronActorAddr
|
|
addressToID[builtin.StoragePowerActorAddr] = builtin.StoragePowerActorAddr
|
|
addressToID[builtin.StorageMarketActorAddr] = builtin.StorageMarketActorAddr
|
|
addressToID[builtin.VerifiedRegistryActorAddr] = builtin.VerifiedRegistryActorAddr
|
|
addressToID[builtin.BurntFundsActorAddr] = builtin.BurntFundsActorAddr
|
|
initActor, err := p.node.StateGetActor(ctx, builtin.InitActorAddr, types.EmptyTSK)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
initActorRaw, err := p.node.ChainReadObj(ctx, initActor.Head)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var initActorState _init.State
|
|
if err := initActorState.UnmarshalCBOR(bytes.NewReader(initActorRaw)); err != nil {
|
|
return err
|
|
}
|
|
ctxStore := cw_util.NewAPIIpldStore(ctx, p.node)
|
|
addrMap, err := adt.AsMap(ctxStore, initActorState.AddressMap)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// gross..
|
|
var actorID typegen.CborInt
|
|
if err := addrMap.ForEach(&actorID, func(key string) error {
|
|
longAddr, err := address.NewFromBytes([]byte(key))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
shortAddr, err := address.NewIDAddress(uint64(actorID))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
addressToID[longAddr] = shortAddr
|
|
return nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
tx, err := p.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.Exec(`
|
|
create temp table iam (like id_address_map excluding constraints) on commit drop;
|
|
`); err != nil {
|
|
return xerrors.Errorf("prep temp: %w", err)
|
|
}
|
|
|
|
stmt, err := tx.Prepare(`copy iam (id, address) from STDIN `)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for a, i := range addressToID {
|
|
if i == address.Undef {
|
|
continue
|
|
}
|
|
if _, err := stmt.Exec(
|
|
i.String(),
|
|
a.String(),
|
|
); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if err := stmt.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// HACK until chain watch can handle reorgs we need to update this table when ID -> PubKey mappings change
|
|
if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict (id) do update set address = EXCLUDED.address`); err != nil {
|
|
log.Warnw("Failed to update id_address_map table, this is a known issue")
|
|
return nil
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (p *Processor) storeActorHeads(actors map[cid.Cid]ActorTips) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
log.Debugw("Stored Actor Heads", "duration", time.Since(start).String())
|
|
}()
|
|
// Basic
|
|
tx, err := p.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := tx.Exec(`
|
|
create temp table a (like actors excluding constraints) on commit drop;
|
|
`); err != nil {
|
|
return xerrors.Errorf("prep temp: %w", err)
|
|
}
|
|
|
|
stmt, err := tx.Prepare(`copy a (id, code, head, nonce, balance, stateroot) from stdin `)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for code, actTips := range actors {
|
|
actorName := code.String()
|
|
if s, err := multihash.Decode(code.Hash()); err != nil {
|
|
actorName = string(s.Digest)
|
|
}
|
|
for _, actorInfo := range actTips {
|
|
for _, a := range actorInfo {
|
|
if _, err := stmt.Exec(a.addr.String(), actorName, a.act.Head.String(), a.act.Nonce, a.act.Balance.String(), a.stateroot.String()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := stmt.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.Exec(`insert into actors select * from a on conflict do nothing `); err != nil {
|
|
return xerrors.Errorf("actor put: %w", err)
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (p *Processor) storeActorStates(actors map[cid.Cid]ActorTips) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
log.Debugw("Stored Actor States", "duration", time.Since(start).String())
|
|
}()
|
|
// States
|
|
tx, err := p.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := tx.Exec(`
|
|
create temp table a (like actor_states excluding constraints) on commit drop;
|
|
`); err != nil {
|
|
return xerrors.Errorf("prep temp: %w", err)
|
|
}
|
|
|
|
stmt, err := tx.Prepare(`copy a (head, code, state) from stdin `)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for code, actTips := range actors {
|
|
actorName := code.String()
|
|
if s, err := multihash.Decode(code.Hash()); err != nil {
|
|
actorName = string(s.Digest)
|
|
}
|
|
for _, actorInfo := range actTips {
|
|
for _, a := range actorInfo {
|
|
if _, err := stmt.Exec(a.act.Head.String(), actorName, a.state); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := stmt.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.Exec(`insert into actor_states select * from a on conflict do nothing `); err != nil {
|
|
return xerrors.Errorf("actor put: %w", err)
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|