Merge pull request #1115 from filecoin-project/feat/chainwatch-json-states

chainwatch: Store json states
This commit is contained in:
Łukasz Magiera 2020-01-21 17:15:33 +01:00 committed by GitHub
commit 7b295d7bb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 215 additions and 29 deletions

View File

@ -161,6 +161,10 @@ func DecodeParams(b []byte, out interface{}) error {
}
func DumpActorState(code cid.Cid, b []byte) (interface{}, error) {
if code == actors.AccountCodeCid { // Account code special case
return nil, nil
}
i := newInvoker() // TODO: register builtins in init block
typ, ok := i.builtInState[code]

View File

@ -2,16 +2,18 @@ package main
import (
"database/sql"
"encoding/hex"
"fmt"
"github.com/filecoin-project/lotus/api"
"golang.org/x/xerrors"
"sync"
"time"
"github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
_ "github.com/lib/pq"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
)
@ -75,8 +77,8 @@ create table if not exists blocks
eprof bytea,
prand bytea,
ep0partial bytea,
ep0sector bigint not null,
ep0challangei bigint not null
ep0sector numeric not null,
ep0challangei numeric not null
);
create unique index if not exists block_cid_uindex
@ -117,6 +119,22 @@ create index if not exists id_address_map_address_index
create index if not exists id_address_map_id_index
on id_address_map (id);
create table if not exists actor_states
(
head text not null,
code text not null,
state json not null
);
create unique index if not exists actor_states_head_code_uindex
on actor_states (head, code);
create index if not exists actor_states_head_index
on actor_states (head);
create index if not exists actor_states_code_head_index
on actor_states (head, code);
create table if not exists messages
(
cid text not null
@ -124,11 +142,11 @@ create table if not exists messages
primary key,
"from" text not null,
"to" text not null,
nonce int not null,
nonce bigint not null,
value text not null,
gasprice int not null,
gaslimit int not null,
method int,
gasprice bigint not null,
gaslimit bigint not null,
method bigint,
params bytea
);
@ -184,7 +202,7 @@ create table if not exists miner_heads
worker text not null,
peerid text not null,
sectorsize bigint not null,
power bigint not null,
power decimal not null,
active bool,
ppe bigint not null,
slashed_at bigint not null,
@ -192,6 +210,33 @@ create table if not exists miner_heads
primary key (head, addr)
);
create table if not exists deals
(
id int not null,
pieceRef text not null,
pieceSize bigint not null,
client text not null,
provider text not null,
expiration decimal not null,
duration decimal not null,
epochPrice decimal not null,
collateral decimal not null,
constraint deals_pk
primary key (id)
);
create index if not exists deals_client_index
on deals (client);
create unique index if not exists deals_id_uindex
on deals (id);
create index if not exists deals_pieceRef_index
on deals (pieceRef);
create index if not exists deals_provider_index
on deals (provider);
`)
if err != nil {
return err
@ -226,17 +271,15 @@ func (st *storage) hasList() map[cid.Cid]struct{} {
return out
}
func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Cid) error {
func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorInfo) error {
// Basic
tx, err := st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create temp table a (like actors excluding constraints) on commit drop;
`); err != nil {
create temp table a (like actors excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
@ -247,7 +290,7 @@ create temp table a (like actors excluding constraints) on commit drop;
for addr, acts := range actors {
for act, st := range acts {
if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.String()); err != nil {
if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.stateroot.String()); err != nil {
return err
}
}
@ -261,7 +304,47 @@ create temp table a (like actors excluding constraints) on commit drop;
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
if err := tx.Commit(); err != nil {
return err
}
// States
tx, err = st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create temp table a (like actor_states excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err = tx.Prepare(`copy a (head, code, state) from stdin `)
if err != nil {
return err
}
for _, acts := range actors {
for act, st := range acts {
if _, err := stmt.Exec(act.Head.String(), act.Code.String(), st.state); err != nil {
return err
}
}
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into actor_states select * from a on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error {
@ -602,11 +685,8 @@ func (st *storage) storeMpoolInclusions(msgs []api.MpoolUpdate) error {
}
if _, err := tx.Exec(`
create temp table mi (like mpool_messages excluding constraints) on commit drop;
`); err != nil {
create temp table mi (like mpool_messages excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
@ -639,6 +719,59 @@ create temp table mi (like mpool_messages excluding constraints) on commit drop;
return tx.Commit()
}
func (st *storage) storeDeals(deals map[string]actors.OnChainDeal) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create temp table d (like deals excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy d (id, pieceref, piecesize, client, "provider", expiration, duration, epochprice, collateral) from stdin `)
if err != nil {
return err
}
var bloat uint64
for id, deal := range deals {
if len(deal.PieceRef) > 40 {
bloat += uint64(len(deal.PieceRef))
continue
}
if _, err := stmt.Exec(
id,
hex.EncodeToString(deal.PieceRef),
deal.PieceSize,
deal.Client.String(),
deal.Provider.String(),
fmt.Sprint(deal.ProposalExpiration),
fmt.Sprint(deal.Duration),
deal.StoragePricePerEpoch.String(),
deal.StorageCollateral.String(),
); err != nil {
return err
}
}
if bloat > 0 {
log.Warnf("deal PieceRefs had %d bytes of garbage", bloat)
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into deals select * from d on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
}
func (st *storage) close() error {
return st.db.Close()
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"container/list"
"context"
"encoding/json"
"math"
"sync"
@ -60,9 +61,14 @@ type minerInfo struct {
psize uint64
}
type actorInfo struct {
stateroot cid.Cid
state string
}
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) {
addresses := map[address.Address]address.Address{}
actors := map[address.Address]map[types.Actor]cid.Cid{}
actors := map[address.Address]map[types.Actor]actorInfo{}
var alk sync.Mutex
log.Infof("Getting synced block list")
@ -150,12 +156,26 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Error(err)
return
}
ast, err := api.StateReadState(ctx, act, ts)
if err != nil {
log.Error(err)
return
}
state, err := json.Marshal(ast.State)
if err != nil {
log.Error(err)
return
}
alk.Lock()
_, ok := actors[addr]
if !ok {
actors[addr] = map[types.Actor]cid.Cid{}
actors[addr] = map[types.Actor]actorInfo{}
}
actors[addr][*act] = actorInfo{
stateroot: bh.ParentStateRoot,
state: string(state),
}
actors[addr][*act] = bh.ParentStateRoot
addresses[addr] = address.Undef
alk.Unlock()
})
@ -181,13 +201,26 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Error(err)
return
}
ast, err := api.StateReadState(ctx, &act, ts)
if err != nil {
log.Error(err)
return
}
state, err := json.Marshal(ast.State)
if err != nil {
log.Error(err)
return
}
alk.Lock()
_, ok := actors[addr]
if !ok {
actors[addr] = map[types.Actor]cid.Cid{}
actors[addr] = map[types.Actor]actorInfo{}
}
actors[addr][act] = actorInfo{
stateroot: bh.ParentStateRoot,
state: string(state),
}
actors[addr][act] = bh.ParentStateRoot
addresses[addr] = address.Undef
alk.Unlock()
}
@ -228,7 +261,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
miners[minerKey{
addr: addr,
act: actor,
stateroot: c,
stateroot: c.stateroot,
}] = &minerInfo{}
}
}
@ -322,6 +355,22 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Infof("Sync stage done")
}
log.Infof("Get deals")
// TODO: incremental, gather expired
deals, err := api.StateMarketDeals(ctx, ts)
if err != nil {
log.Error(err)
return
}
log.Infof("Store deals")
if err := st.storeDeals(deals); err != nil {
log.Error(err)
return
}
log.Infof("Sync done")
}