chainwatch: Capture more state

This commit is contained in:
Łukasz Magiera 2019-11-15 19:37:57 +01:00
parent 8ac65cde80
commit 820f7bfb8a
3 changed files with 212 additions and 59 deletions

View File

@ -2,9 +2,12 @@ package main
import (
"database/sql"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
)
type storage struct {
@ -28,13 +31,39 @@ func (st *storage) setup() error {
return err
}
_, err = tx.Exec(`
create table actors
(
id text not null,
code text not null,
head text not null,
nonce int not null,
balance text,
constraint actors_pk
unique (id, code, head, nonce, balance)
);
create table id_address_map
(
id text not null
constraint id_address_map_actors_id_fk
references actors (id),
address text not null,
constraint id_address_map_pk
primary key (id, address)
);
create table messages
(
cid text not null
constraint messages_pk
primary key,
"from" text not null,
"to" text not null,
"from" text not null
constraint messages_id_address_map_from_fk
references id_address_map (address),
"to" text not null
constraint messages_id_address_map_to_fk
references id_address_map (address),
nonce int not null,
value text not null,
gasprice int not null,
@ -53,7 +82,10 @@ create table blocks
primary key,
parentWeight numeric not null,
height int not null,
timestamp text not null
miner text not null
constraint blocks_id_address_map_miner_fk
references id_address_map (address),
timestamp int not null
);
create unique index blocks_cid_uindex
@ -87,19 +119,41 @@ func (st *storage) hasBlock(bh *types.BlockHeader) bool {
return exitsts
}
func (st *storage) storeHeaders(bhs []*types.BlockHeader) error {
func (st *storage) storeActors(actors map[address.Address]map[types.Actor]struct{}) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, height, "timestamp") values (?, ?, ?, ?)`)
stmt, err := tx.Prepare(`insert into actors (id, code, head, nonce, balance) values (?, ?, ?, ?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for addr, acts := range actors {
for act, _ := range acts {
if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String()); err != nil {
return err
}
}
}
return tx.Commit()
}
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, height, miner, "timestamp") values (?, ?, ?, ?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for _, bh := range bhs {
if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.Height, bh.Timestamp); err != nil {
if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil {
return err
}
}
@ -113,7 +167,7 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error {
return err
}
stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`)
stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict do nothing`)
if err != nil {
return err
}
@ -138,6 +192,33 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error {
return tx.Commit()
}
func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into id_address_map (id, address) VALUES (?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for a, i := range addrs {
if i == address.Undef {
continue
}
if _, err := stmt.Exec(
i.String(),
a.String(),
); err != nil {
return err
}
}
return tx.Commit()
}
func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
tx, err := st.db.Begin()
if err != nil {

View File

@ -3,7 +3,7 @@ package main
import (
"container/list"
"context"
"fmt"
"github.com/filecoin-project/lotus/chain/address"
"sync"
"github.com/ipfs/go-cid"
@ -35,7 +35,13 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage) {
}
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) {
var toSync []*types.BlockHeader
addresses := map[address.Address]address.Address{}
actors := map[address.Address]map[types.Actor]struct{}{}
var alk sync.Mutex
log.Infof("Getting headers / actors")
toSync := map[cid.Cid]*types.BlockHeader{}
toVisit := list.New()
for _, header := range ts.Blocks() {
@ -45,15 +51,19 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
for toVisit.Len() > 0 {
bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader)
if !st.hasBlock(bh) {
toSync = append(toSync, bh)
if _, seen := toSync[bh.Cid()]; seen || st.hasBlock(bh) {
continue
}
if len(toSync)%500 == 0 {
toSync[bh.Cid()] = bh
addresses[bh.Miner] = address.Undef
if len(toSync)%500 == 10 {
log.Infof("todo: (%d) %s", len(toSync), bh.Cid())
}
if len(bh.Parents) == 0 {
break
continue
}
pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
@ -69,6 +79,42 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Infof("Syncing %d blocks", len(toSync))
log.Infof("Persisting actors")
paDone := 0
par(40, maparr(toSync), func(bh *types.BlockHeader) {
paDone++
if paDone%100 == 0 {
log.Infof("pa: %d %d%%", paDone, (paDone*100)/len(toSync))
}
ts, err := types.NewTipSet([]*types.BlockHeader{bh})
aadrs, err := api.StateListActors(ctx, ts)
if err != nil {
return
}
par(50, aadrs, func(addr address.Address) {
act, err := api.StateGetActor(ctx, addr, ts)
if err != nil {
log.Error(err)
return
}
alk.Lock()
_, ok := actors[addr]
if !ok {
actors[addr] = map[types.Actor]struct{}{}
}
actors[addr][*act] = struct{}{}
addresses[addr] = address.Undef
alk.Unlock()
})
})
if err := st.storeActors(actors); err != nil {
log.Error(err)
return
}
log.Infof("Persisting headers")
if err := st.storeHeaders(toSync); err != nil {
log.Error(err)
@ -89,40 +135,39 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
return
}
log.Infof("Getting actors")
// TODO: for now this assumes that actor can't be removed
log.Infof("Resolving addresses")
/* aadrs, err := api.StateListActors(ctx, ts)
for _, message := range msgs {
addresses[message.To] = address.Undef
addresses[message.From] = address.Undef
}
par(50, kmaparr(addresses), func(addr address.Address) {
raddr, err := api.StateLookupID(ctx, addr, nil)
if err != nil {
log.Warn(err)
return
}*/
}
alk.Lock()
addresses[addr] = raddr
alk.Unlock()
})
if err := st.storeAddressMap(addresses); err != nil {
log.Error(err)
return
}
log.Infof("Sync done")
}
func fetchMessages(ctx context.Context, api api.FullNode, toSync []*types.BlockHeader) (map[cid.Cid]*types.Message, map[cid.Cid][]cid.Cid) {
func fetchMessages(ctx context.Context, api api.FullNode, toSync map[cid.Cid]*types.BlockHeader) (map[cid.Cid]*types.Message, map[cid.Cid][]cid.Cid) {
var lk sync.Mutex
messages := map[cid.Cid]*types.Message{}
inclusions := map[cid.Cid][]cid.Cid{} // block -> msgs
throttle := make(chan struct{}, 50)
var wg sync.WaitGroup
for _, header := range toSync {
if header.Height%30 == 0 {
fmt.Printf("\rh: %d", header.Height)
}
throttle <- struct{}{}
wg.Add(1)
go func(header cid.Cid) {
defer wg.Done()
defer func() {
<-throttle
}()
msgs, err := api.ChainGetBlockMessages(ctx, header)
par(50, maparr(toSync), func(header *types.BlockHeader) {
msgs, err := api.ChainGetBlockMessages(ctx, header.Cid())
if err != nil {
log.Error(err)
return
@ -140,13 +185,10 @@ func fetchMessages(ctx context.Context, api api.FullNode, toSync []*types.BlockH
lk.Lock()
for _, message := range vmm {
messages[message.Cid()] = message
inclusions[header] = append(inclusions[header], message.Cid())
inclusions[header.Cid()] = append(inclusions[header.Cid()], message.Cid())
}
lk.Unlock()
}(header.Cid())
}
wg.Wait()
})
return messages, inclusions
}

View File

@ -5,6 +5,34 @@ import (
"sync"
)
func maparr(in interface{}) interface{} {
rin := reflect.ValueOf(in)
rout := reflect.MakeSlice(reflect.SliceOf(rin.Type().Elem()), rin.Len(), rin.Len())
var i int
it := rin.MapRange()
for it.Next() {
rout.Index(i).Set(it.Value())
i++
}
return rout.Interface()
}
func kmaparr(in interface{}) interface{} {
rin := reflect.ValueOf(in)
rout := reflect.MakeSlice(reflect.SliceOf(rin.Type().Elem()), rin.Len(), rin.Len())
var i int
it := rin.MapRange()
for it.Next() {
rout.Index(i).Set(it.Key())
i++
}
return rout.Interface()
}
func par(concurrency int, arr interface{}, f interface{}) {
throttle := make(chan struct{}, concurrency)
var wg sync.WaitGroup
@ -18,12 +46,14 @@ func par(concurrency int, arr interface{}, f interface{}) {
for i := 0; i < l; i++ {
throttle <- struct{}{}
go func() {
go func(i int) {
defer wg.Done()
defer func() {
<-throttle
}()
rf.Call([]reflect.Value{varr.Index(i)})
}()
}(i)
}
wg.Wait()
}