diff --git a/cmd/lotus-chainwatch/dot.go b/cmd/lotus-chainwatch/dot.go index 87dab62ad..d40b31d86 100644 --- a/cmd/lotus-chainwatch/dot.go +++ b/cmd/lotus-chainwatch/dot.go @@ -1,10 +1,13 @@ package main import ( + "database/sql" "fmt" "hash/crc32" "strconv" + "golang.org/x/xerrors" + "github.com/ipfs/go-cid" "github.com/urfave/cli/v2" ) @@ -14,10 +17,19 @@ var dotCmd = &cli.Command{ Usage: "generate dot graphs", ArgsUsage: " ", Action: func(cctx *cli.Context) error { - st, err := openStorage(cctx.String("db")) + db, err := sql.Open("postgres", cctx.String("db")) if err != nil { return err } + defer func() { + if err := db.Close(); err != nil { + log.Errorw("Failed to close database", "error", err) + } + }() + + if err := db.Ping(); err != nil { + return xerrors.Errorf("Database failed to respond to ping (is it online?): %w", err) + } minH, err := strconv.ParseInt(cctx.Args().Get(0), 10, 32) if err != nil { @@ -29,7 +41,7 @@ var dotCmd = &cli.Command{ } maxH := minH + tosee - res, err := st.db.Query(`select block, parent, b.miner, b.height, p.height from block_parents + res, err := db.Query(`select block, parent, b.miner, b.height, p.height from block_parents inner join blocks b on block_parents.block = b.cid inner join blocks p on block_parents.parent = p.cid where b.height > $1 and b.height < $2`, minH, maxH) @@ -40,7 +52,10 @@ where b.height > $1 and b.height < $2`, minH, maxH) fmt.Println("digraph D {") - hl := st.hasList() + hl, err := syncedBlocks(db) + if err != nil { + log.Fatal(err) + } for res.Next() { var block, parent, miner string @@ -85,3 +100,27 @@ where b.height > $1 and b.height < $2`, minH, maxH) return nil }, } + +func syncedBlocks(db *sql.DB) (map[cid.Cid]struct{}, error) { + // timestamp is used to return a configurable amount of rows based on when they were last added. + rws, err := db.Query(`select cid FROM blocks_synced`) + if err != nil { + return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err) + } + out := map[cid.Cid]struct{}{} + + for rws.Next() { + var c string + if err := rws.Scan(&c); err != nil { + return nil, xerrors.Errorf("Failed to scan blocks_synced: %w", err) + } + + ci, err := cid.Parse(c) + if err != nil { + return nil, xerrors.Errorf("Failed to parse blocks_synced: %w", err) + } + + out[ci] = struct{}{} + } + return out, nil +} diff --git a/cmd/lotus-chainwatch/main.go b/cmd/lotus-chainwatch/main.go index d3c5a570b..1db95cecf 100644 --- a/cmd/lotus-chainwatch/main.go +++ b/cmd/lotus-chainwatch/main.go @@ -1,17 +1,20 @@ package main import ( - "fmt" - "net/http" + "database/sql" _ "net/http/pprof" "os" - logging "github.com/ipfs/go-log/v2" - "github.com/urfave/cli/v2" + _ "github.com/lib/pq" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" + logging "github.com/ipfs/go-log/v2" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/processor" + "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/syncer" ) var log = logging.Logger("chainwatch") @@ -25,8 +28,8 @@ func main() { log.Info("Starting chainwatch") local := []*cli.Command{ - runCmd, dotCmd, + runCmd, } app := &cli.App{ @@ -85,28 +88,29 @@ var runCmd = &cli.Command{ maxBatch := cctx.Int("max-batch") - st, err := openStorage(cctx.String("db")) + db, err := sql.Open("postgres", cctx.String("db")) if err != nil { return err } - defer st.close() //nolint:errcheck - - runSyncer(ctx, api, st, maxBatch) - - h, err := newHandler(api, st) - if err != nil { - return xerrors.Errorf("handler setup: %w", err) - } - - http.Handle("/", h) - - fmt.Printf("Open http://%s\n", cctx.String("front")) - - go func() { - <-ctx.Done() - os.Exit(0) + defer func() { + if err := db.Close(); err != nil { + log.Errorw("Failed to close database", "error", err) + } }() - return http.ListenAndServe(cctx.String("front"), nil) + if err := db.Ping(); err != nil { + return xerrors.Errorf("Database failed to respond to ping (is it online?): %w", err) + } + db.SetMaxOpenConns(1350) + + sync := syncer.NewSyncer(db, api) + sync.Start(ctx) + + proc := processor.NewProcessor(db, api, maxBatch) + proc.Start(ctx) + + <-ctx.Done() + os.Exit(0) + return nil }, }