refactor: wire up new processor and syncer

This commit is contained in:
frrist 2020-07-14 13:04:27 -07:00
parent 41f4f1fd83
commit 9a53bf8f83
2 changed files with 69 additions and 26 deletions

View File

@ -1,10 +1,13 @@
package main package main
import ( import (
"database/sql"
"fmt" "fmt"
"hash/crc32" "hash/crc32"
"strconv" "strconv"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
) )
@ -14,10 +17,19 @@ var dotCmd = &cli.Command{
Usage: "generate dot graphs", Usage: "generate dot graphs",
ArgsUsage: "<minHeight> <toseeHeight>", ArgsUsage: "<minHeight> <toseeHeight>",
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
st, err := openStorage(cctx.String("db")) db, err := sql.Open("postgres", cctx.String("db"))
if err != nil { if err != nil {
return err return err
} }
defer func() {
if err := db.Close(); err != nil {
log.Errorw("Failed to close database", "error", err)
}
}()
if err := db.Ping(); err != nil {
return xerrors.Errorf("Database failed to respond to ping (is it online?): %w", err)
}
minH, err := strconv.ParseInt(cctx.Args().Get(0), 10, 32) minH, err := strconv.ParseInt(cctx.Args().Get(0), 10, 32)
if err != nil { if err != nil {
@ -29,7 +41,7 @@ var dotCmd = &cli.Command{
} }
maxH := minH + tosee maxH := minH + tosee
res, err := st.db.Query(`select block, parent, b.miner, b.height, p.height from block_parents res, err := db.Query(`select block, parent, b.miner, b.height, p.height from block_parents
inner join blocks b on block_parents.block = b.cid inner join blocks b on block_parents.block = b.cid
inner join blocks p on block_parents.parent = p.cid inner join blocks p on block_parents.parent = p.cid
where b.height > $1 and b.height < $2`, minH, maxH) where b.height > $1 and b.height < $2`, minH, maxH)
@ -40,7 +52,10 @@ where b.height > $1 and b.height < $2`, minH, maxH)
fmt.Println("digraph D {") fmt.Println("digraph D {")
hl := st.hasList() hl, err := syncedBlocks(db)
if err != nil {
log.Fatal(err)
}
for res.Next() { for res.Next() {
var block, parent, miner string var block, parent, miner string
@ -85,3 +100,27 @@ where b.height > $1 and b.height < $2`, minH, maxH)
return nil return nil
}, },
} }
func syncedBlocks(db *sql.DB) (map[cid.Cid]struct{}, error) {
// timestamp is used to return a configurable amount of rows based on when they were last added.
rws, err := db.Query(`select cid FROM blocks_synced`)
if err != nil {
return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err)
}
out := map[cid.Cid]struct{}{}
for rws.Next() {
var c string
if err := rws.Scan(&c); err != nil {
return nil, xerrors.Errorf("Failed to scan blocks_synced: %w", err)
}
ci, err := cid.Parse(c)
if err != nil {
return nil, xerrors.Errorf("Failed to parse blocks_synced: %w", err)
}
out[ci] = struct{}{}
}
return out, nil
}

View File

@ -1,17 +1,20 @@
package main package main
import ( import (
"fmt" "database/sql"
"net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
logging "github.com/ipfs/go-log/v2" _ "github.com/lib/pq"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/processor"
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/syncer"
) )
var log = logging.Logger("chainwatch") var log = logging.Logger("chainwatch")
@ -25,8 +28,8 @@ func main() {
log.Info("Starting chainwatch") log.Info("Starting chainwatch")
local := []*cli.Command{ local := []*cli.Command{
runCmd,
dotCmd, dotCmd,
runCmd,
} }
app := &cli.App{ app := &cli.App{
@ -85,28 +88,29 @@ var runCmd = &cli.Command{
maxBatch := cctx.Int("max-batch") maxBatch := cctx.Int("max-batch")
st, err := openStorage(cctx.String("db")) db, err := sql.Open("postgres", cctx.String("db"))
if err != nil { if err != nil {
return err return err
} }
defer st.close() //nolint:errcheck defer func() {
if err := db.Close(); err != nil {
runSyncer(ctx, api, st, maxBatch) log.Errorw("Failed to close database", "error", err)
h, err := newHandler(api, st)
if err != nil {
return xerrors.Errorf("handler setup: %w", err)
} }
http.Handle("/", h)
fmt.Printf("Open http://%s\n", cctx.String("front"))
go func() {
<-ctx.Done()
os.Exit(0)
}() }()
return http.ListenAndServe(cctx.String("front"), nil) if err := db.Ping(); err != nil {
return xerrors.Errorf("Database failed to respond to ping (is it online?): %w", err)
}
db.SetMaxOpenConns(1350)
sync := syncer.NewSyncer(db, api)
sync.Start(ctx)
proc := processor.NewProcessor(db, api, maxBatch)
proc.Start(ctx)
<-ctx.Done()
os.Exit(0)
return nil
}, },
} }