diff --git a/chain/vm/invoker.go b/chain/vm/invoker.go index 117ae5338..ab2e54e0f 100644 --- a/chain/vm/invoker.go +++ b/chain/vm/invoker.go @@ -161,6 +161,10 @@ func DecodeParams(b []byte, out interface{}) error { } func DumpActorState(code cid.Cid, b []byte) (interface{}, error) { + if code == actors.AccountCodeCid { // Account code special case + return nil, nil + } + i := newInvoker() // TODO: register builtins in init block typ, ok := i.builtInState[code] diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 9c8e7e7a2..b4d219960 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -2,16 +2,18 @@ package main import ( "database/sql" + "encoding/hex" "fmt" - "github.com/filecoin-project/lotus/api" - "golang.org/x/xerrors" "sync" "time" + "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" _ "github.com/lib/pq" + "golang.org/x/xerrors" - "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/types" ) @@ -75,8 +77,8 @@ create table if not exists blocks eprof bytea, prand bytea, ep0partial bytea, - ep0sector bigint not null, - ep0challangei bigint not null + ep0sector numeric not null, + ep0challangei numeric not null ); create unique index if not exists block_cid_uindex @@ -117,6 +119,22 @@ create index if not exists id_address_map_address_index create index if not exists id_address_map_id_index on id_address_map (id); +create table if not exists actor_states +( + head text not null, + code text not null, + state json not null +); + +create unique index if not exists actor_states_head_code_uindex + on actor_states (head, code); + +create index if not exists actor_states_head_index + on actor_states (head); + +create index if not exists actor_states_code_head_index + on actor_states (head, code); + create table if not exists messages ( cid text not null @@ -124,11 +142,11 @@ create table if not exists messages primary key, "from" text not null, "to" text not null, - nonce int not null, + nonce bigint not null, value text not null, - gasprice int not null, - gaslimit int not null, - method int, + gasprice bigint not null, + gaslimit bigint not null, + method bigint, params bytea ); @@ -184,7 +202,7 @@ create table if not exists miner_heads worker text not null, peerid text not null, sectorsize bigint not null, - power bigint not null, + power decimal not null, active bool, ppe bigint not null, slashed_at bigint not null, @@ -192,6 +210,33 @@ create table if not exists miner_heads primary key (head, addr) ); +create table if not exists deals +( + id int not null, + pieceRef text not null, + pieceSize bigint not null, + client text not null, + provider text not null, + expiration decimal not null, + duration decimal not null, + epochPrice decimal not null, + collateral decimal not null, + constraint deals_pk + primary key (id) +); + +create index if not exists deals_client_index + on deals (client); + +create unique index if not exists deals_id_uindex + on deals (id); + +create index if not exists deals_pieceRef_index + on deals (pieceRef); + +create index if not exists deals_provider_index + on deals (provider); + `) if err != nil { return err @@ -226,17 +271,15 @@ func (st *storage) hasList() map[cid.Cid]struct{} { return out } -func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Cid) error { +func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorInfo) error { + // Basic tx, err := st.db.Begin() if err != nil { return err } if _, err := tx.Exec(` - -create temp table a (like actors excluding constraints) on commit drop; - - -`); err != nil { + create temp table a (like actors excluding constraints) on commit drop; + `); err != nil { return xerrors.Errorf("prep temp: %w", err) } @@ -247,7 +290,7 @@ create temp table a (like actors excluding constraints) on commit drop; for addr, acts := range actors { for act, st := range acts { - if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.String()); err != nil { + if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.stateroot.String()); err != nil { return err } } @@ -261,7 +304,47 @@ create temp table a (like actors excluding constraints) on commit drop; return xerrors.Errorf("actor put: %w", err) } - return tx.Commit() + if err := tx.Commit(); err != nil { + return err + } + + // States + tx, err = st.db.Begin() + if err != nil { + return err + } + if _, err := tx.Exec(` + create temp table a (like actor_states excluding constraints) on commit drop; + `); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err = tx.Prepare(`copy a (head, code, state) from stdin `) + if err != nil { + return err + } + + for _, acts := range actors { + for act, st := range acts { + if _, err := stmt.Exec(act.Head.String(), act.Code.String(), st.state); err != nil { + return err + } + } + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into actor_states select * from a on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + if err := tx.Commit(); err != nil { + return err + } + + return nil } func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { @@ -602,11 +685,8 @@ func (st *storage) storeMpoolInclusions(msgs []api.MpoolUpdate) error { } if _, err := tx.Exec(` - -create temp table mi (like mpool_messages excluding constraints) on commit drop; - - -`); err != nil { + create temp table mi (like mpool_messages excluding constraints) on commit drop; + `); err != nil { return xerrors.Errorf("prep temp: %w", err) } @@ -639,6 +719,59 @@ create temp table mi (like mpool_messages excluding constraints) on commit drop; return tx.Commit() } +func (st *storage) storeDeals(deals map[string]actors.OnChainDeal) error { + tx, err := st.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(` + create temp table d (like deals excluding constraints) on commit drop; + `); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy d (id, pieceref, piecesize, client, "provider", expiration, duration, epochprice, collateral) from stdin `) + if err != nil { + return err + } + + var bloat uint64 + + for id, deal := range deals { + if len(deal.PieceRef) > 40 { + bloat += uint64(len(deal.PieceRef)) + continue + } + if _, err := stmt.Exec( + id, + hex.EncodeToString(deal.PieceRef), + deal.PieceSize, + deal.Client.String(), + deal.Provider.String(), + fmt.Sprint(deal.ProposalExpiration), + fmt.Sprint(deal.Duration), + deal.StoragePricePerEpoch.String(), + deal.StorageCollateral.String(), + ); err != nil { + return err + } + } + if bloat > 0 { + log.Warnf("deal PieceRefs had %d bytes of garbage", bloat) + } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into deals select * from d on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() +} + func (st *storage) close() error { return st.db.Close() } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index eac861aa8..33d6605f2 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -4,6 +4,7 @@ import ( "bytes" "container/list" "context" + "encoding/json" "math" "sync" @@ -60,9 +61,14 @@ type minerInfo struct { psize uint64 } +type actorInfo struct { + stateroot cid.Cid + state string +} + func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) { addresses := map[address.Address]address.Address{} - actors := map[address.Address]map[types.Actor]cid.Cid{} + actors := map[address.Address]map[types.Actor]actorInfo{} var alk sync.Mutex log.Infof("Getting synced block list") @@ -150,12 +156,26 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Error(err) return } + ast, err := api.StateReadState(ctx, act, ts) + if err != nil { + log.Error(err) + return + } + state, err := json.Marshal(ast.State) + if err != nil { + log.Error(err) + return + } + alk.Lock() _, ok := actors[addr] if !ok { - actors[addr] = map[types.Actor]cid.Cid{} + actors[addr] = map[types.Actor]actorInfo{} + } + actors[addr][*act] = actorInfo{ + stateroot: bh.ParentStateRoot, + state: string(state), } - actors[addr][*act] = bh.ParentStateRoot addresses[addr] = address.Undef alk.Unlock() }) @@ -181,13 +201,26 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Error(err) return } + ast, err := api.StateReadState(ctx, &act, ts) + if err != nil { + log.Error(err) + return + } + state, err := json.Marshal(ast.State) + if err != nil { + log.Error(err) + return + } alk.Lock() _, ok := actors[addr] if !ok { - actors[addr] = map[types.Actor]cid.Cid{} + actors[addr] = map[types.Actor]actorInfo{} + } + actors[addr][act] = actorInfo{ + stateroot: bh.ParentStateRoot, + state: string(state), } - actors[addr][act] = bh.ParentStateRoot addresses[addr] = address.Undef alk.Unlock() } @@ -228,7 +261,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS miners[minerKey{ addr: addr, act: actor, - stateroot: c, + stateroot: c.stateroot, }] = &minerInfo{} } } @@ -322,6 +355,22 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Infof("Sync stage done") } + log.Infof("Get deals") + + // TODO: incremental, gather expired + deals, err := api.StateMarketDeals(ctx, ts) + if err != nil { + log.Error(err) + return + } + + log.Infof("Store deals") + + if err := st.storeDeals(deals); err != nil { + log.Error(err) + return + } + log.Infof("Sync done") }