From 820f7bfb8a536d60cc025d4e66b9f39dc0f71ad6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 15 Nov 2019 19:37:57 +0100 Subject: [PATCH] chainwatch: Capture more state --- cmd/lotus-chainwatch/storage.go | 97 ++++++++++++++++++++-- cmd/lotus-chainwatch/sync.go | 140 +++++++++++++++++++++----------- cmd/lotus-chainwatch/utils.go | 34 +++++++- 3 files changed, 212 insertions(+), 59 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index c6c6c473e..2df5821de 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -2,9 +2,12 @@ package main import ( "database/sql" - "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" _ "github.com/mattn/go-sqlite3" + + "github.com/filecoin-project/lotus/chain/address" + "github.com/filecoin-project/lotus/chain/types" ) type storage struct { @@ -28,13 +31,39 @@ func (st *storage) setup() error { return err } _, err = tx.Exec(` +create table actors + ( + id text not null, + code text not null, + head text not null, + nonce int not null, + balance text, + constraint actors_pk + unique (id, code, head, nonce, balance) + ); + +create table id_address_map +( + id text not null + constraint id_address_map_actors_id_fk + references actors (id), + address text not null, + constraint id_address_map_pk + primary key (id, address) +); + + create table messages ( cid text not null constraint messages_pk primary key, - "from" text not null, - "to" text not null, + "from" text not null + constraint messages_id_address_map_from_fk + references id_address_map (address), + "to" text not null + constraint messages_id_address_map_to_fk + references id_address_map (address), nonce int not null, value text not null, gasprice int not null, @@ -53,7 +82,10 @@ create table blocks primary key, parentWeight numeric not null, height int not null, - timestamp text not null + miner text not null + constraint blocks_id_address_map_miner_fk + references id_address_map (address), + timestamp int not null ); create unique index blocks_cid_uindex @@ -87,19 +119,41 @@ func (st *storage) hasBlock(bh *types.BlockHeader) bool { return exitsts } -func (st *storage) storeHeaders(bhs []*types.BlockHeader) error { +func (st *storage) storeActors(actors map[address.Address]map[types.Actor]struct{}) error { tx, err := st.db.Begin() if err != nil { return err } - stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, height, "timestamp") values (?, ?, ?, ?)`) + stmt, err := tx.Prepare(`insert into actors (id, code, head, nonce, balance) values (?, ?, ?, ?, ?) on conflict do nothing`) + if err != nil { + return err + } + defer stmt.Close() + for addr, acts := range actors { + for act, _ := range acts { + if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String()); err != nil { + return err + } + } + } + + return tx.Commit() +} + +func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader) error { + tx, err := st.db.Begin() + if err != nil { + return err + } + + stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, height, miner, "timestamp") values (?, ?, ?, ?, ?) on conflict do nothing`) if err != nil { return err } defer stmt.Close() for _, bh := range bhs { - if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.Height, bh.Timestamp); err != nil { + if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil { return err } } @@ -113,7 +167,7 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error { return err } - stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`) + stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict do nothing`) if err != nil { return err } @@ -138,6 +192,33 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error { return tx.Commit() } +func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) error { + tx, err := st.db.Begin() + if err != nil { + return err + } + + stmt, err := tx.Prepare(`insert into id_address_map (id, address) VALUES (?, ?) on conflict do nothing`) + if err != nil { + return err + } + defer stmt.Close() + + for a, i := range addrs { + if i == address.Undef { + continue + } + if _, err := stmt.Exec( + i.String(), + a.String(), + ); err != nil { + return err + } + } + + return tx.Commit() +} + func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error { tx, err := st.db.Begin() if err != nil { diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 66fa4e19a..58dc6943c 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -3,7 +3,7 @@ package main import ( "container/list" "context" - "fmt" + "github.com/filecoin-project/lotus/chain/address" "sync" "github.com/ipfs/go-cid" @@ -35,7 +35,13 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage) { } func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) { - var toSync []*types.BlockHeader + addresses := map[address.Address]address.Address{} + actors := map[address.Address]map[types.Actor]struct{}{} + var alk sync.Mutex + + log.Infof("Getting headers / actors") + + toSync := map[cid.Cid]*types.BlockHeader{} toVisit := list.New() for _, header := range ts.Blocks() { @@ -45,15 +51,19 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS for toVisit.Len() > 0 { bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) - if !st.hasBlock(bh) { - toSync = append(toSync, bh) + if _, seen := toSync[bh.Cid()]; seen || st.hasBlock(bh) { + continue } - if len(toSync)%500 == 0 { + + toSync[bh.Cid()] = bh + addresses[bh.Miner] = address.Undef + + if len(toSync)%500 == 10 { log.Infof("todo: (%d) %s", len(toSync), bh.Cid()) } if len(bh.Parents) == 0 { - break + continue } pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) @@ -69,6 +79,42 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Infof("Syncing %d blocks", len(toSync)) + log.Infof("Persisting actors") + + paDone := 0 + par(40, maparr(toSync), func(bh *types.BlockHeader) { + paDone++ + if paDone%100 == 0 { + log.Infof("pa: %d %d%%", paDone, (paDone*100)/len(toSync)) + } + ts, err := types.NewTipSet([]*types.BlockHeader{bh}) + aadrs, err := api.StateListActors(ctx, ts) + if err != nil { + return + } + + par(50, aadrs, func(addr address.Address) { + act, err := api.StateGetActor(ctx, addr, ts) + if err != nil { + log.Error(err) + return + } + alk.Lock() + _, ok := actors[addr] + if !ok { + actors[addr] = map[types.Actor]struct{}{} + } + actors[addr][*act] = struct{}{} + addresses[addr] = address.Undef + alk.Unlock() + }) + }) + + if err := st.storeActors(actors); err != nil { + log.Error(err) + return + } + log.Infof("Persisting headers") if err := st.storeHeaders(toSync); err != nil { log.Error(err) @@ -89,64 +135,60 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS return } - log.Infof("Getting actors") - // TODO: for now this assumes that actor can't be removed + log.Infof("Resolving addresses") - /* aadrs, err := api.StateListActors(ctx, ts) + for _, message := range msgs { + addresses[message.To] = address.Undef + addresses[message.From] = address.Undef + } + + par(50, kmaparr(addresses), func(addr address.Address) { + raddr, err := api.StateLookupID(ctx, addr, nil) if err != nil { + log.Warn(err) return - }*/ + } + alk.Lock() + addresses[addr] = raddr + alk.Unlock() + }) + + if err := st.storeAddressMap(addresses); err != nil { + log.Error(err) + return + } log.Infof("Sync done") } -func fetchMessages(ctx context.Context, api api.FullNode, toSync []*types.BlockHeader) (map[cid.Cid]*types.Message, map[cid.Cid][]cid.Cid) { +func fetchMessages(ctx context.Context, api api.FullNode, toSync map[cid.Cid]*types.BlockHeader) (map[cid.Cid]*types.Message, map[cid.Cid][]cid.Cid) { var lk sync.Mutex messages := map[cid.Cid]*types.Message{} inclusions := map[cid.Cid][]cid.Cid{} // block -> msgs - throttle := make(chan struct{}, 50) - var wg sync.WaitGroup - - for _, header := range toSync { - if header.Height%30 == 0 { - fmt.Printf("\rh: %d", header.Height) + par(50, maparr(toSync), func(header *types.BlockHeader) { + msgs, err := api.ChainGetBlockMessages(ctx, header.Cid()) + if err != nil { + log.Error(err) + return } - throttle <- struct{}{} - wg.Add(1) + vmm := make([]*types.Message, 0, len(msgs.Cids)) + for _, m := range msgs.BlsMessages { + vmm = append(vmm, m) + } - go func(header cid.Cid) { - defer wg.Done() - defer func() { - <-throttle - }() + for _, m := range msgs.SecpkMessages { + vmm = append(vmm, &m.Message) + } - msgs, err := api.ChainGetBlockMessages(ctx, header) - if err != nil { - log.Error(err) - return - } - - vmm := make([]*types.Message, 0, len(msgs.Cids)) - for _, m := range msgs.BlsMessages { - vmm = append(vmm, m) - } - - for _, m := range msgs.SecpkMessages { - vmm = append(vmm, &m.Message) - } - - lk.Lock() - for _, message := range vmm { - messages[message.Cid()] = message - inclusions[header] = append(inclusions[header], message.Cid()) - } - lk.Unlock() - - }(header.Cid()) - } - wg.Wait() + lk.Lock() + for _, message := range vmm { + messages[message.Cid()] = message + inclusions[header.Cid()] = append(inclusions[header.Cid()], message.Cid()) + } + lk.Unlock() + }) return messages, inclusions } diff --git a/cmd/lotus-chainwatch/utils.go b/cmd/lotus-chainwatch/utils.go index 00c8e95fd..82a2a0110 100644 --- a/cmd/lotus-chainwatch/utils.go +++ b/cmd/lotus-chainwatch/utils.go @@ -5,6 +5,34 @@ import ( "sync" ) +func maparr(in interface{}) interface{} { + rin := reflect.ValueOf(in) + rout := reflect.MakeSlice(reflect.SliceOf(rin.Type().Elem()), rin.Len(), rin.Len()) + var i int + + it := rin.MapRange() + for it.Next() { + rout.Index(i).Set(it.Value()) + i++ + } + + return rout.Interface() +} + +func kmaparr(in interface{}) interface{} { + rin := reflect.ValueOf(in) + rout := reflect.MakeSlice(reflect.SliceOf(rin.Type().Elem()), rin.Len(), rin.Len()) + var i int + + it := rin.MapRange() + for it.Next() { + rout.Index(i).Set(it.Key()) + i++ + } + + return rout.Interface() +} + func par(concurrency int, arr interface{}, f interface{}) { throttle := make(chan struct{}, concurrency) var wg sync.WaitGroup @@ -18,12 +46,14 @@ func par(concurrency int, arr interface{}, f interface{}) { for i := 0; i < l; i++ { throttle <- struct{}{} - go func() { + go func(i int) { defer wg.Done() defer func() { <-throttle }() rf.Call([]reflect.Value{varr.Index(i)}) - }() + }(i) } + + wg.Wait() }