chainwatch: more improvements

This commit is contained in:
Łukasz Magiera 2019-11-19 13:57:16 +01:00
parent d3b980ef58
commit ced47a11e7
5 changed files with 93 additions and 15 deletions

View File

@ -0,0 +1,26 @@
package main
import (
"context"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
aapi "github.com/filecoin-project/lotus/api"
)
func subBlocks(ctx context.Context, api aapi.FullNode, st *storage) {
sub, err := api.SyncIncomingBlocks(ctx)
if err != nil {
log.Error(err)
return
}
for bh := range sub {
err := st.storeHeaders(map[cid.Cid]*types.BlockHeader{
bh.Cid(): bh,
}, false)
if err != nil {
log.Error(err)
}
}
}

View File

@ -5,7 +5,9 @@ import (
"hash/crc32"
"strconv"
"github.com/ipfs/go-cid"
"gopkg.in/urfave/cli.v2"
)
var dotCmd = &cli.Command{
@ -21,7 +23,7 @@ var dotCmd = &cli.Command{
tosee, err := strconv.ParseInt(cctx.Args().Get(1), 10, 32)
maxH := minH + tosee
res, err := st.db.Query("select block, parent, b.miner from block_parents inner join blocks b on block_parents.block = b.cid where b.height > ? and b.height < ?", minH, maxH)
res, err := st.db.Query("select block, parent, b.miner, b.height from block_parents inner join blocks b on block_parents.block = b.cid where b.height > ? and b.height < ?", minH, maxH)
if err != nil {
return err
}
@ -30,13 +32,26 @@ var dotCmd = &cli.Command{
for res.Next() {
var block, parent, miner string
if err := res.Scan(&block, &parent, &miner); err != nil {
var height uint64
if err := res.Scan(&block, &parent, &miner, &height); err != nil {
return err
}
col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli))&0x80808080 + 0x70707070
bc, err := cid.Parse(block)
if err != nil {
return err
}
fmt.Printf("%s [label = \"%s\", fillcolor = \"#%06x\", style=filled]\n%s -> %s\n", block, miner, col, block, parent)
has := st.hasBlock(bc)
col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli))&0xc0c0c0c0 + 0x30303030
hasstr := ""
if !has {
hasstr = " UNSYNCED"
}
fmt.Printf("%s [label = \"%s:%d%s\", fillcolor = \"#%06x\", style=filled, forcelabels=true]\n%s -> %s\n", block, miner, height, hasstr, col, block, parent)
}
if res.Err() != nil {
return res.Err()

View File

@ -2,13 +2,15 @@ package main
import (
"fmt"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"net/http"
"os"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
"net/http"
"os"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
)
var log = logging.Logger("chainwatch")
@ -76,6 +78,7 @@ var runCmd = &cli.Command{
runSyncer(ctx, api, st)
go subMpool(ctx, api, st)
go subBlocks(ctx, api, st)
h, err := newHandler(api, st)
if err != nil {

View File

@ -2,6 +2,7 @@ package main
import (
"database/sql"
"sync"
"time"
"github.com/ipfs/go-cid"
@ -13,6 +14,8 @@ import (
type storage struct {
db *sql.DB
headerLk sync.Mutex
}
func openStorage() (*storage, error) {
@ -101,6 +104,21 @@ create table if not exists blocks
timestamp int not null
);
create unique index if not exists block_cid_uindex
on blocks (cid);
create table if not exists blocks_synced
(
cid text not null
constraint blocks_synced_pk
primary key
constraint blocks_synced_blocks_cid_fk
references blocks
);
create unique index if not exists blocks_synced_cid_uindex
on blocks_synced (cid);
create table if not exists block_parents
(
block text not null
@ -178,9 +196,9 @@ create table if not exists miner_heads
return tx.Commit()
}
func (st *storage) hasBlock(bh *types.BlockHeader) bool {
func (st *storage) hasBlock(bh cid.Cid) bool {
var exitsts bool
err := st.db.QueryRow(`select exists (select 1 FROM blocks where cid=?)`, bh.Cid().String()).Scan(&exitsts)
err := st.db.QueryRow(`select exists (select 1 FROM blocks_synced where cid=?)`, bh.String()).Scan(&exitsts)
if err != nil {
log.Error(err)
return false
@ -244,7 +262,10 @@ func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error {
return tx.Commit()
}
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader) error {
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
st.headerLk.Lock()
defer st.headerLk.Unlock()
tx, err := st.db.Begin()
if err != nil {
return err
@ -274,6 +295,19 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader) error {
}
}
if sync {
stmt, err := tx.Prepare(`insert into blocks_synced (cid) values (?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for _, bh := range bhs {
if _, err := stmt.Exec(bh.Cid().String()); err != nil {
return err
}
}
}
return tx.Commit()
}
@ -341,7 +375,7 @@ func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
return err
}
stmt, err := tx.Prepare(`insert into block_messages (block, message) VALUES (?, ?)`)
stmt, err := tx.Prepare(`insert into block_messages (block, message) VALUES (?, ?) on conflict do nothing`)
if err != nil {
return err
}
@ -367,7 +401,7 @@ func (st *storage) storeMpoolInclusion(msg cid.Cid) error {
return err
}
stmt, err := tx.Prepare(`insert into mpool_messages (msg, add_ts) VALUES (?, ?)`)
stmt, err := tx.Prepare(`insert into mpool_messages (msg, add_ts) VALUES (?, ?) on conflict do nothing`)
if err != nil {
return err
}

View File

@ -64,7 +64,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
for toVisit.Len() > 0 {
bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader)
if _, seen := toSync[bh.Cid()]; seen || st.hasBlock(bh) {
if _, seen := toSync[bh.Cid()]; seen || st.hasBlock(bh.Cid()) {
continue
}
@ -213,7 +213,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
}
log.Infof("Persisting headers")
if err := st.storeHeaders(toSync); err != nil {
if err := st.storeHeaders(toSync, true); err != nil {
log.Error(err)
return
}