chainwatch: Store json states

This commit is contained in:
Łukasz Magiera 2020-01-19 17:18:47 +01:00
parent 4436148fa1
commit 62d661f898
3 changed files with 112 additions and 21 deletions

View File

@ -161,6 +161,10 @@ func DecodeParams(b []byte, out interface{}) error {
} }
func DumpActorState(code cid.Cid, b []byte) (interface{}, error) { func DumpActorState(code cid.Cid, b []byte) (interface{}, error) {
if code == actors.AccountCodeCid { // Account code special case
return nil, nil
}
i := newInvoker() // TODO: register builtins in init block i := newInvoker() // TODO: register builtins in init block
typ, ok := i.builtInState[code] typ, ok := i.builtInState[code]

View File

@ -75,8 +75,8 @@ create table if not exists blocks
eprof bytea, eprof bytea,
prand bytea, prand bytea,
ep0partial bytea, ep0partial bytea,
ep0sector bigint not null, ep0sector numeric not null,
ep0challangei bigint not null ep0challangei numeric not null
); );
create unique index if not exists block_cid_uindex create unique index if not exists block_cid_uindex
@ -117,6 +117,22 @@ create index if not exists id_address_map_address_index
create index if not exists id_address_map_id_index create index if not exists id_address_map_id_index
on id_address_map (id); on id_address_map (id);
create table if not exists actor_states
(
head text not null,
code text not null,
state json not null
);
create unique index if not exists actor_states_head_code_uindex
on actor_states (head, code);
create index if not exists actor_states_head_index
on actor_states (head);
create index if not exists actor_states_code_head_index
on actor_states (head, code);
create table if not exists messages create table if not exists messages
( (
cid text not null cid text not null
@ -124,11 +140,11 @@ create table if not exists messages
primary key, primary key,
"from" text not null, "from" text not null,
"to" text not null, "to" text not null,
nonce int not null, nonce bigint not null,
value text not null, value text not null,
gasprice int not null, gasprice bigint not null,
gaslimit int not null, gaslimit bigint not null,
method int, method bigint,
params bytea params bytea
); );
@ -184,7 +200,7 @@ create table if not exists miner_heads
worker text not null, worker text not null,
peerid text not null, peerid text not null,
sectorsize bigint not null, sectorsize bigint not null,
power bigint not null, power decimal not null,
active bool, active bool,
ppe bigint not null, ppe bigint not null,
slashed_at bigint not null, slashed_at bigint not null,
@ -226,17 +242,15 @@ func (st *storage) hasList() map[cid.Cid]struct{} {
return out return out
} }
func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Cid) error { func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorInfo) error {
// Basic
tx, err := st.db.Begin() tx, err := st.db.Begin()
if err != nil { if err != nil {
return err return err
} }
if _, err := tx.Exec(` if _, err := tx.Exec(`
create temp table a (like actors excluding constraints) on commit drop;
create temp table a (like actors excluding constraints) on commit drop; `); err != nil {
`); err != nil {
return xerrors.Errorf("prep temp: %w", err) return xerrors.Errorf("prep temp: %w", err)
} }
@ -247,7 +261,7 @@ create temp table a (like actors excluding constraints) on commit drop;
for addr, acts := range actors { for addr, acts := range actors {
for act, st := range acts { for act, st := range acts {
if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.String()); err != nil { if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.stateroot.String()); err != nil {
return err return err
} }
} }
@ -261,7 +275,47 @@ create temp table a (like actors excluding constraints) on commit drop;
return xerrors.Errorf("actor put: %w", err) return xerrors.Errorf("actor put: %w", err)
} }
return tx.Commit() if err := tx.Commit(); err != nil {
return err
}
// States
tx, err = st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create temp table a (like actor_states excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err = tx.Prepare(`copy a (head, code, state) from stdin `)
if err != nil {
return err
}
for _, acts := range actors {
for act, st := range acts {
if _, err := stmt.Exec(act.Head.String(), act.Code.String(), st.state); err != nil {
return err
}
}
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into actor_states select * from a on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
if err := tx.Commit(); err != nil {
return err
}
return nil
} }
func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error {

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"container/list" "container/list"
"context" "context"
"encoding/json"
"math" "math"
"sync" "sync"
@ -60,9 +61,14 @@ type minerInfo struct {
psize uint64 psize uint64
} }
type actorInfo struct {
stateroot cid.Cid
state string
}
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) { func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) {
addresses := map[address.Address]address.Address{} addresses := map[address.Address]address.Address{}
actors := map[address.Address]map[types.Actor]cid.Cid{} actors := map[address.Address]map[types.Actor]actorInfo{}
var alk sync.Mutex var alk sync.Mutex
log.Infof("Getting synced block list") log.Infof("Getting synced block list")
@ -150,12 +156,26 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Error(err) log.Error(err)
return return
} }
ast, err := api.StateReadState(ctx, act, ts)
if err != nil {
log.Error(err)
return
}
state, err := json.Marshal(ast.State)
if err != nil {
log.Error(err)
return
}
alk.Lock() alk.Lock()
_, ok := actors[addr] _, ok := actors[addr]
if !ok { if !ok {
actors[addr] = map[types.Actor]cid.Cid{} actors[addr] = map[types.Actor]actorInfo{}
}
actors[addr][*act] = actorInfo{
stateroot: bh.ParentStateRoot,
state: string(state),
} }
actors[addr][*act] = bh.ParentStateRoot
addresses[addr] = address.Undef addresses[addr] = address.Undef
alk.Unlock() alk.Unlock()
}) })
@ -181,13 +201,26 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Error(err) log.Error(err)
return return
} }
ast, err := api.StateReadState(ctx, &act, ts)
if err != nil {
log.Error(err)
return
}
state, err := json.Marshal(ast.State)
if err != nil {
log.Error(err)
return
}
alk.Lock() alk.Lock()
_, ok := actors[addr] _, ok := actors[addr]
if !ok { if !ok {
actors[addr] = map[types.Actor]cid.Cid{} actors[addr] = map[types.Actor]actorInfo{}
}
actors[addr][act] = actorInfo{
stateroot: bh.ParentStateRoot,
state: string(state),
} }
actors[addr][act] = bh.ParentStateRoot
addresses[addr] = address.Undef addresses[addr] = address.Undef
alk.Unlock() alk.Unlock()
} }
@ -228,7 +261,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
miners[minerKey{ miners[minerKey{
addr: addr, addr: addr,
act: actor, act: actor,
stateroot: c, stateroot: c.stateroot,
}] = &minerInfo{} }] = &minerInfo{}
} }
} }