107 lines
2.3 KiB
Go
107 lines
2.3 KiB
Go
package main
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"net/http"
|
|
_ "net/http/pprof"
|
|
"os"
|
|
"strings"
|
|
|
|
_ "github.com/lib/pq"
|
|
|
|
"github.com/filecoin-project/go-jsonrpc"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
"github.com/urfave/cli/v2"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
lcli "github.com/filecoin-project/lotus/cli"
|
|
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/processor"
|
|
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/scheduler"
|
|
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/syncer"
|
|
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util"
|
|
)
|
|
|
|
var runCmd = &cli.Command{
|
|
Name: "run",
|
|
Usage: "Start lotus chainwatch",
|
|
Flags: []cli.Flag{
|
|
&cli.IntFlag{
|
|
Name: "max-batch",
|
|
Value: 50,
|
|
},
|
|
},
|
|
Action: func(cctx *cli.Context) error {
|
|
go func() {
|
|
http.ListenAndServe(":6060", nil) //nolint:errcheck
|
|
}()
|
|
ll := cctx.String("log-level")
|
|
if err := logging.SetLogLevel("*", ll); err != nil {
|
|
return err
|
|
}
|
|
if err := logging.SetLogLevel("rpc", "error"); err != nil {
|
|
return err
|
|
}
|
|
|
|
var api api.FullNode
|
|
var closer jsonrpc.ClientCloser
|
|
var err error
|
|
if tokenMaddr := cctx.String("api"); tokenMaddr != "" {
|
|
toks := strings.Split(tokenMaddr, ":")
|
|
if len(toks) != 2 {
|
|
return fmt.Errorf("invalid api tokens, expected <token>:<maddr>, got: %s", tokenMaddr)
|
|
}
|
|
|
|
api, closer, err = util.GetFullNodeAPIUsingCredentials(cctx.Context, toks[1], toks[0])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
api, closer, err = lcli.GetFullNodeAPI(cctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
defer closer()
|
|
ctx := lcli.ReqContext(cctx)
|
|
|
|
v, err := api.Version(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infof("Remote version: %s", v.Version)
|
|
|
|
maxBatch := cctx.Int("max-batch")
|
|
|
|
db, err := sql.Open("postgres", cctx.String("db"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err := db.Close(); err != nil {
|
|
log.Errorw("Failed to close database", "error", err)
|
|
}
|
|
}()
|
|
|
|
if err := db.Ping(); err != nil {
|
|
return xerrors.Errorf("Database failed to respond to ping (is it online?): %w", err)
|
|
}
|
|
db.SetMaxOpenConns(1350)
|
|
|
|
sync := syncer.NewSyncer(db, api)
|
|
sync.Start(ctx)
|
|
|
|
proc := processor.NewProcessor(ctx, db, api, maxBatch)
|
|
proc.Start(ctx)
|
|
|
|
sched := scheduler.PrepareScheduler(db)
|
|
sched.Start(ctx)
|
|
|
|
<-ctx.Done()
|
|
os.Exit(0)
|
|
return nil
|
|
},
|
|
}
|