Begin implementing ChainWatch

This commit is contained in:
Łukasz Magiera 2019-11-15 17:38:56 +01:00
parent 35659af84d
commit 70956589ef
6 changed files with 441 additions and 0 deletions

View File

@ -0,0 +1,88 @@
package main
import (
"fmt"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
logging "github.com/ipfs/go-log"
"gopkg.in/urfave/cli.v2"
"net/http"
"os"
)
var log = logging.Logger("chainwatch")
func main() {
logging.SetLogLevel("*", "INFO")
log.Info("Starting chainwatch")
local := []*cli.Command{
runCmd,
}
app := &cli.App{
Name: "lotus-chainwatch",
Usage: "Devnet token distribution utility",
Version: build.Version,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
EnvVars: []string{"LOTUS_PATH"},
Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME
},
},
Commands: local,
}
if err := app.Run(os.Args); err != nil {
log.Warn(err)
return
}
}
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus chainwatch",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "front",
Value: "127.0.0.1:8418",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
v, err := api.Version(ctx)
if err != nil {
return err
}
log.Info("Remote version: %s", v.Version)
//http.Handle("/", http.FileServer(rice.MustFindBox("site").HTTPBox()))
fmt.Printf("Open http://%s\n", cctx.String("front"))
st, err := openStorage()
if err != nil {
return err
}
defer st.close()
runSyncer(ctx, api, st)
go func() {
<-ctx.Done()
os.Exit(0)
}()
return http.ListenAndServe(cctx.String("front"), nil)
},
}

View File

@ -0,0 +1,169 @@
package main
import (
"database/sql"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3"
)
type storage struct {
db *sql.DB
}
func openStorage() (*storage, error) {
db, err := sql.Open("sqlite3", "./chainwatch.db")
if err != nil {
return nil, err
}
st := &storage{db: db}
return st, st.setup()
}
func (st *storage) setup() error {
tx, err := st.db.Begin()
if err != nil {
return err
}
_, err = tx.Exec(`
create table messages
(
cid text not null
constraint messages_pk
primary key,
"from" text not null,
"to" text not null,
nonce int not null,
value text not null,
gasprice int not null,
gaslimit int not null,
method int,
params blob
);
create unique index messages_cid_uindex
on messages (cid);
create table blocks
(
cid text not null
constraint blocks_pk
primary key,
parentWeight numeric not null,
height int not null,
timestamp text not null
);
create unique index blocks_cid_uindex
on blocks (cid);
create table block_messages
(
block text not null
constraint block_messages_blk_fk
references blocks (cid),
message text not null
constraint block_messages_msg_fk
references messages,
constraint block_messages_pk
unique (block, message)
);
`)
if err != nil {
return err
}
return tx.Commit()
}
func (st *storage) hasBlock(bh *types.BlockHeader) bool {
var exitsts bool
err := st.db.QueryRow(`select exists (select 1 FROM blocks where cid=?)`, bh.Cid().String()).Scan(&exitsts)
if err != nil {
log.Error(err)
return false
}
return exitsts
}
func (st *storage) storeHeaders(bhs []*types.BlockHeader) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, height, "timestamp") values (?, ?, ?, ?)`)
if err != nil {
return err
}
defer stmt.Close()
for _, bh := range bhs {
if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.Height, bh.Timestamp); err != nil {
return err
}
}
return tx.Commit()
}
func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
defer stmt.Close()
for c, m := range msgs {
if _, err := stmt.Exec(
c.String(),
m.From.String(),
m.To.String(),
m.Nonce,
m.Value.String(),
m.GasPrice.String(),
m.GasLimit.String(),
m.Method,
m.Params,
); err != nil {
return err
}
}
return tx.Commit()
}
func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into block_messages (block, message) VALUES (?, ?)`)
if err != nil {
return err
}
defer stmt.Close()
for b, msgs := range incls {
for _, msg := range msgs {
if _, err := stmt.Exec(
b.String(),
msg.String(),
); err != nil {
return err
}
}
}
return tx.Commit()
}
func (st *storage) close() error {
return st.db.Close()
}

View File

@ -0,0 +1,152 @@
package main
import (
"container/list"
"context"
"fmt"
"sync"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
func runSyncer(ctx context.Context, api api.FullNode, st *storage) {
notifs, err := api.ChainNotify(ctx)
if err != nil {
panic(err)
}
go func() {
for notif := range notifs {
for _, change := range notif {
switch change.Type {
case store.HCCurrent:
fallthrough
case store.HCApply:
syncHead(ctx, api, st, change.Val)
case store.HCRevert:
log.Warnf("revert todo")
}
}
}
}()
}
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) {
var toSync []*types.BlockHeader
toVisit := list.New()
for _, header := range ts.Blocks() {
toVisit.PushBack(header)
}
for toVisit.Len() > 0 {
bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader)
if !st.hasBlock(bh) {
toSync = append(toSync, bh)
}
if len(toSync)%500 == 0 {
log.Infof("todo: (%d) %s", len(toSync), bh.Cid())
}
if len(bh.Parents) == 0 {
break
}
pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
if err != nil {
log.Error(err)
continue
}
for _, header := range pts.Blocks() {
toVisit.PushBack(header)
}
}
log.Infof("Syncing %d blocks", len(toSync))
log.Infof("Persisting headers")
if err := st.storeHeaders(toSync); err != nil {
log.Error(err)
return
}
log.Infof("Getting messages")
msgs, incls := fetchMessages(ctx, api, toSync)
if err := st.storeMessages(msgs); err != nil {
log.Error(err)
return
}
if err := st.storeMsgInclusions(incls); err != nil {
log.Error(err)
return
}
log.Infof("Getting actors")
// TODO: for now this assumes that actor can't be removed
/* aadrs, err := api.StateListActors(ctx, ts)
if err != nil {
return
}*/
log.Infof("Sync done")
}
func fetchMessages(ctx context.Context, api api.FullNode, toSync []*types.BlockHeader) (map[cid.Cid]*types.Message, map[cid.Cid][]cid.Cid) {
var lk sync.Mutex
messages := map[cid.Cid]*types.Message{}
inclusions := map[cid.Cid][]cid.Cid{} // block -> msgs
throttle := make(chan struct{}, 50)
var wg sync.WaitGroup
for _, header := range toSync {
if header.Height%30 == 0 {
fmt.Printf("\rh: %d", header.Height)
}
throttle <- struct{}{}
wg.Add(1)
go func(header cid.Cid) {
defer wg.Done()
defer func() {
<-throttle
}()
msgs, err := api.ChainGetBlockMessages(ctx, header)
if err != nil {
log.Error(err)
return
}
vmm := make([]*types.Message, 0, len(msgs.Cids))
for _, m := range msgs.BlsMessages {
vmm = append(vmm, m)
}
for _, m := range msgs.SecpkMessages {
vmm = append(vmm, &m.Message)
}
lk.Lock()
for _, message := range vmm {
messages[message.Cid()] = message
inclusions[header] = append(inclusions[header], message.Cid())
}
lk.Unlock()
}(header.Cid())
}
wg.Wait()
return messages, inclusions
}

View File

@ -0,0 +1,29 @@
package main
import (
"reflect"
"sync"
)
func par(concurrency int, arr interface{}, f interface{}) {
throttle := make(chan struct{}, concurrency)
var wg sync.WaitGroup
varr := reflect.ValueOf(arr)
l := varr.Len()
rf := reflect.ValueOf(f)
wg.Add(l)
for i := 0; i < l; i++ {
throttle <- struct{}{}
go func() {
defer wg.Done()
defer func() {
<-throttle
}()
rf.Call([]reflect.Value{varr.Index(i)})
}()
}
}

1
go.mod
View File

@ -72,6 +72,7 @@ require (
github.com/libp2p/go-maddr-filter v0.0.5
github.com/mattn/go-isatty v0.0.9 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/mattn/go-sqlite3 v1.12.0
github.com/miekg/dns v1.1.16 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771

2
go.sum
View File

@ -425,6 +425,8 @@ github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-sqlite3 v1.12.0 h1:u/x3mp++qUxvYfulZ4HKOvVO0JWhk7HtE8lWhbGz/Do=
github.com/mattn/go-sqlite3 v1.12.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/miekg/dns v1.1.12/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=