From 62d661f8983c95251fc5c38f46924353cfdc4042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 19 Jan 2020 17:18:47 +0100 Subject: [PATCH] chainwatch: Store json states --- chain/vm/invoker.go | 4 ++ cmd/lotus-chainwatch/storage.go | 84 +++++++++++++++++++++++++++------ cmd/lotus-chainwatch/sync.go | 45 +++++++++++++++--- 3 files changed, 112 insertions(+), 21 deletions(-) diff --git a/chain/vm/invoker.go b/chain/vm/invoker.go index 117ae5338..ab2e54e0f 100644 --- a/chain/vm/invoker.go +++ b/chain/vm/invoker.go @@ -161,6 +161,10 @@ func DecodeParams(b []byte, out interface{}) error { } func DumpActorState(code cid.Cid, b []byte) (interface{}, error) { + if code == actors.AccountCodeCid { // Account code special case + return nil, nil + } + i := newInvoker() // TODO: register builtins in init block typ, ok := i.builtInState[code] diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 9c8e7e7a2..72ccd9dd7 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -75,8 +75,8 @@ create table if not exists blocks eprof bytea, prand bytea, ep0partial bytea, - ep0sector bigint not null, - ep0challangei bigint not null + ep0sector numeric not null, + ep0challangei numeric not null ); create unique index if not exists block_cid_uindex @@ -117,6 +117,22 @@ create index if not exists id_address_map_address_index create index if not exists id_address_map_id_index on id_address_map (id); +create table if not exists actor_states +( + head text not null, + code text not null, + state json not null +); + +create unique index if not exists actor_states_head_code_uindex + on actor_states (head, code); + +create index if not exists actor_states_head_index + on actor_states (head); + +create index if not exists actor_states_code_head_index + on actor_states (head, code); + create table if not exists messages ( cid text not null @@ -124,11 +140,11 @@ create table if not exists messages primary key, "from" text not null, "to" text not null, - nonce int not null, + nonce bigint not null, value text not null, - gasprice int not null, - gaslimit int not null, - method int, + gasprice bigint not null, + gaslimit bigint not null, + method bigint, params bytea ); @@ -184,7 +200,7 @@ create table if not exists miner_heads worker text not null, peerid text not null, sectorsize bigint not null, - power bigint not null, + power decimal not null, active bool, ppe bigint not null, slashed_at bigint not null, @@ -226,17 +242,15 @@ func (st *storage) hasList() map[cid.Cid]struct{} { return out } -func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Cid) error { +func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorInfo) error { + // Basic tx, err := st.db.Begin() if err != nil { return err } if _, err := tx.Exec(` - -create temp table a (like actors excluding constraints) on commit drop; - - -`); err != nil { + create temp table a (like actors excluding constraints) on commit drop; + `); err != nil { return xerrors.Errorf("prep temp: %w", err) } @@ -247,7 +261,7 @@ create temp table a (like actors excluding constraints) on commit drop; for addr, acts := range actors { for act, st := range acts { - if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.String()); err != nil { + if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.stateroot.String()); err != nil { return err } } @@ -261,7 +275,47 @@ create temp table a (like actors excluding constraints) on commit drop; return xerrors.Errorf("actor put: %w", err) } - return tx.Commit() + if err := tx.Commit(); err != nil { + return err + } + + // States + tx, err = st.db.Begin() + if err != nil { + return err + } + if _, err := tx.Exec(` + create temp table a (like actor_states excluding constraints) on commit drop; + `); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err = tx.Prepare(`copy a (head, code, state) from stdin `) + if err != nil { + return err + } + + for _, acts := range actors { + for act, st := range acts { + if _, err := stmt.Exec(act.Head.String(), act.Code.String(), st.state); err != nil { + return err + } + } + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into actor_states select * from a on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + if err := tx.Commit(); err != nil { + return err + } + + return nil } func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index eac861aa8..113d1fa70 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -4,6 +4,7 @@ import ( "bytes" "container/list" "context" + "encoding/json" "math" "sync" @@ -60,9 +61,14 @@ type minerInfo struct { psize uint64 } +type actorInfo struct { + stateroot cid.Cid + state string +} + func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) { addresses := map[address.Address]address.Address{} - actors := map[address.Address]map[types.Actor]cid.Cid{} + actors := map[address.Address]map[types.Actor]actorInfo{} var alk sync.Mutex log.Infof("Getting synced block list") @@ -150,12 +156,26 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Error(err) return } + ast, err := api.StateReadState(ctx, act, ts) + if err != nil { + log.Error(err) + return + } + state, err := json.Marshal(ast.State) + if err != nil { + log.Error(err) + return + } + alk.Lock() _, ok := actors[addr] if !ok { - actors[addr] = map[types.Actor]cid.Cid{} + actors[addr] = map[types.Actor]actorInfo{} + } + actors[addr][*act] = actorInfo{ + stateroot: bh.ParentStateRoot, + state: string(state), } - actors[addr][*act] = bh.ParentStateRoot addresses[addr] = address.Undef alk.Unlock() }) @@ -181,13 +201,26 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Error(err) return } + ast, err := api.StateReadState(ctx, &act, ts) + if err != nil { + log.Error(err) + return + } + state, err := json.Marshal(ast.State) + if err != nil { + log.Error(err) + return + } alk.Lock() _, ok := actors[addr] if !ok { - actors[addr] = map[types.Actor]cid.Cid{} + actors[addr] = map[types.Actor]actorInfo{} + } + actors[addr][act] = actorInfo{ + stateroot: bh.ParentStateRoot, + state: string(state), } - actors[addr][act] = bh.ParentStateRoot addresses[addr] = address.Undef alk.Unlock() } @@ -228,7 +261,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS miners[minerKey{ addr: addr, act: actor, - stateroot: c, + stateroot: c.stateroot, }] = &minerInfo{} } }