fix(log): Move metrics and stage updates to debug; Add --log-level switch
This commit is contained in:
parent
033cfa8f47
commit
f2f797f088
@ -6,10 +6,10 @@ import (
|
|||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"golang.org/x/xerrors"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
var dotCmd = &cli.Command{
|
var dotCmd = &cli.Command{
|
||||||
@ -17,6 +17,11 @@ var dotCmd = &cli.Command{
|
|||||||
Usage: "generate dot graphs",
|
Usage: "generate dot graphs",
|
||||||
ArgsUsage: "<minHeight> <toseeHeight>",
|
ArgsUsage: "<minHeight> <toseeHeight>",
|
||||||
Action: func(cctx *cli.Context) error {
|
Action: func(cctx *cli.Context) error {
|
||||||
|
ll := cctx.String("log-level")
|
||||||
|
if err := logging.SetLogLevel("*", ll); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
db, err := sql.Open("postgres", cctx.String("db"))
|
db, err := sql.Open("postgres", cctx.String("db"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -1,37 +1,22 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
_ "github.com/lib/pq"
|
|
||||||
"golang.org/x/xerrors"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
lcli "github.com/filecoin-project/lotus/cli"
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/processor"
|
|
||||||
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/syncer"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("chainwatch")
|
var log = logging.Logger("chainwatch")
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
_ = logging.SetLogLevel("*", "INFO")
|
if err := logging.SetLogLevel("*", "info"); err != nil {
|
||||||
if err := logging.SetLogLevel("rpc", "error"); err != nil {
|
log.Fatal(err)
|
||||||
panic(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Starting chainwatch")
|
log.Info("Starting chainwatch")
|
||||||
|
|
||||||
local := []*cli.Command{
|
|
||||||
dotCmd,
|
|
||||||
runCmd,
|
|
||||||
}
|
|
||||||
|
|
||||||
app := &cli.App{
|
app := &cli.App{
|
||||||
Name: "lotus-chainwatch",
|
Name: "lotus-chainwatch",
|
||||||
Usage: "Devnet token distribution utility",
|
Usage: "Devnet token distribution utility",
|
||||||
@ -47,70 +32,19 @@ func main() {
|
|||||||
EnvVars: []string{"LOTUS_DB"},
|
EnvVars: []string{"LOTUS_DB"},
|
||||||
Value: "",
|
Value: "",
|
||||||
},
|
},
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "log-level",
|
||||||
|
EnvVars: []string{"GOLOG_LOG_LEVEL"},
|
||||||
|
Value: "info",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Commands: []*cli.Command{
|
||||||
|
dotCmd,
|
||||||
|
runCmd,
|
||||||
},
|
},
|
||||||
|
|
||||||
Commands: local,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := app.Run(os.Args); err != nil {
|
if err := app.Run(os.Args); err != nil {
|
||||||
log.Warnf("%+v", err)
|
log.Fatal(err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var runCmd = &cli.Command{
|
|
||||||
Name: "run",
|
|
||||||
Usage: "Start lotus chainwatch",
|
|
||||||
Flags: []cli.Flag{
|
|
||||||
&cli.StringFlag{
|
|
||||||
Name: "front",
|
|
||||||
Value: "127.0.0.1:8418",
|
|
||||||
},
|
|
||||||
&cli.IntFlag{
|
|
||||||
Name: "max-batch",
|
|
||||||
Value: 1000,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Action: func(cctx *cli.Context) error {
|
|
||||||
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer closer()
|
|
||||||
ctx := lcli.ReqContext(cctx)
|
|
||||||
|
|
||||||
v, err := api.Version(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("Remote version: %s", v.Version)
|
|
||||||
|
|
||||||
maxBatch := cctx.Int("max-batch")
|
|
||||||
|
|
||||||
db, err := sql.Open("postgres", cctx.String("db"))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer func() {
|
|
||||||
if err := db.Close(); err != nil {
|
|
||||||
log.Errorw("Failed to close database", "error", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err := db.Ping(); err != nil {
|
|
||||||
return xerrors.Errorf("Database failed to respond to ping (is it online?): %w", err)
|
|
||||||
}
|
|
||||||
db.SetMaxOpenConns(1350)
|
|
||||||
|
|
||||||
sync := syncer.NewSyncer(db, api)
|
|
||||||
sync.Start(ctx)
|
|
||||||
|
|
||||||
proc := processor.NewProcessor(db, api, maxBatch)
|
|
||||||
proc.Start(ctx)
|
|
||||||
|
|
||||||
<-ctx.Done()
|
|
||||||
os.Exit(0)
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
@ -128,7 +128,7 @@ func (p *Processor) HandleCommonActorsChanges(ctx context.Context, actors map[ci
|
|||||||
func (p Processor) storeActorAddresses(ctx context.Context, actors map[cid.Cid]ActorTips) error {
|
func (p Processor) storeActorAddresses(ctx context.Context, actors map[cid.Cid]ActorTips) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Stored Actor Addresses", "duration", time.Since(start).String())
|
log.Debugw("Stored Actor Addresses", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
addressToID := map[address.Address]address.Address{}
|
addressToID := map[address.Address]address.Address{}
|
||||||
@ -217,7 +217,7 @@ create temp table iam (like id_address_map excluding constraints) on commit drop
|
|||||||
func (p *Processor) storeActorHeads(actors map[cid.Cid]ActorTips) error {
|
func (p *Processor) storeActorHeads(actors map[cid.Cid]ActorTips) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Stored Actor Heads", "duration", time.Since(start).String())
|
log.Debugw("Stored Actor Heads", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
// Basic
|
// Basic
|
||||||
tx, err := p.db.Begin()
|
tx, err := p.db.Begin()
|
||||||
@ -259,7 +259,7 @@ func (p *Processor) storeActorHeads(actors map[cid.Cid]ActorTips) error {
|
|||||||
func (p *Processor) storeActorStates(actors map[cid.Cid]ActorTips) error {
|
func (p *Processor) storeActorStates(actors map[cid.Cid]ActorTips) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Stored Actor States", "duration", time.Since(start).String())
|
log.Debugw("Stored Actor States", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
// States
|
// States
|
||||||
tx, err := p.db.Begin()
|
tx, err := p.db.Begin()
|
||||||
|
@ -91,7 +91,7 @@ func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTip
|
|||||||
func (p *Processor) processMarket(ctx context.Context, marketTips ActorTips) ([]marketActorInfo, error) {
|
func (p *Processor) processMarket(ctx context.Context, marketTips ActorTips) ([]marketActorInfo, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Processed Market", "duration", time.Since(start).String())
|
log.Debugw("Processed Market", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var out []marketActorInfo
|
var out []marketActorInfo
|
||||||
@ -107,7 +107,7 @@ func (p *Processor) processMarket(ctx context.Context, marketTips ActorTips) ([]
|
|||||||
func (p *Processor) persistMarket(ctx context.Context, info []marketActorInfo) error {
|
func (p *Processor) persistMarket(ctx context.Context, info []marketActorInfo) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Persisted Market", "duration", time.Since(start).String())
|
log.Debugw("Persisted Market", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
grp, ctx := errgroup.WithContext(ctx)
|
grp, ctx := errgroup.WithContext(ctx)
|
||||||
@ -140,7 +140,7 @@ func (p *Processor) updateMarket(ctx context.Context, info []marketActorInfo) er
|
|||||||
func (p *Processor) storeMarketActorDealStates(marketTips []marketActorInfo) error {
|
func (p *Processor) storeMarketActorDealStates(marketTips []marketActorInfo) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Stored Market Deal States", "duration", time.Since(start).String())
|
log.Debugw("Stored Market Deal States", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
tx, err := p.db.Begin()
|
tx, err := p.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -191,7 +191,7 @@ func (p *Processor) storeMarketActorDealStates(marketTips []marketActorInfo) err
|
|||||||
func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTips []marketActorInfo) error {
|
func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTips []marketActorInfo) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Stored Market Deal Proposals", "duration", time.Since(start).String())
|
log.Debugw("Stored Market Deal Proposals", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
tx, err := p.db.Begin()
|
tx, err := p.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -255,7 +255,7 @@ func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTip
|
|||||||
func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error {
|
func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Updated Market Deal Proposals", "duration", time.Since(start).String())
|
log.Debugw("Updated Market Deal Proposals", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
pred := state.NewStatePredicates(p.node)
|
pred := state.NewStatePredicates(p.node)
|
||||||
|
|
||||||
|
@ -120,7 +120,7 @@ func (p *Processor) persistMessagesAndReceipts(ctx context.Context, blocks map[c
|
|||||||
func (p *Processor) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
|
func (p *Processor) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Persisted Receipts", "duration", time.Since(start).String())
|
log.Debugw("Persisted Receipts", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
tx, err := p.db.Begin()
|
tx, err := p.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -164,7 +164,7 @@ create temp table recs (like receipts excluding constraints) on commit drop;
|
|||||||
func (p *Processor) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
|
func (p *Processor) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Persisted Message Inclusions", "duration", time.Since(start).String())
|
log.Debugw("Persisted Message Inclusions", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
tx, err := p.db.Begin()
|
tx, err := p.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -161,7 +161,7 @@ func (p *Processor) HandleMinerChanges(ctx context.Context, minerTips ActorTips)
|
|||||||
func (p *Processor) processMiners(ctx context.Context, minerTips map[types.TipSetKey][]actorInfo) ([]minerActorInfo, error) {
|
func (p *Processor) processMiners(ctx context.Context, minerTips map[types.TipSetKey][]actorInfo) ([]minerActorInfo, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Processed Miners", "duration", time.Since(start).String())
|
log.Debugw("Processed Miners", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var out []minerActorInfo
|
var out []minerActorInfo
|
||||||
@ -208,7 +208,7 @@ func (p *Processor) processMiners(ctx context.Context, minerTips map[types.TipSe
|
|||||||
func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) error {
|
func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Persisted Miners", "duration", time.Since(start).String())
|
log.Debugw("Persisted Miners", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
grp, _ := errgroup.WithContext(ctx)
|
grp, _ := errgroup.WithContext(ctx)
|
||||||
@ -247,7 +247,7 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
|
|||||||
func (p *Processor) storeMinersActorState(miners []minerActorInfo) error {
|
func (p *Processor) storeMinersActorState(miners []minerActorInfo) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Stored Miners Actor State", "duration", time.Since(start).String())
|
log.Debugw("Stored Miners Actor State", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
tx, err := p.db.Begin()
|
tx, err := p.db.Begin()
|
||||||
@ -303,7 +303,7 @@ func (p *Processor) storeMinersActorState(miners []minerActorInfo) error {
|
|||||||
func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
|
func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Stored Miners Power", "duration", time.Since(start).String())
|
log.Debugw("Stored Miners Power", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
tx, err := p.db.Begin()
|
tx, err := p.db.Begin()
|
||||||
@ -350,7 +350,7 @@ func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
|
|||||||
func (p *Processor) storeMinersSectorState(miners []minerActorInfo) error {
|
func (p *Processor) storeMinersSectorState(miners []minerActorInfo) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Stored Miners Sector State", "duration", time.Since(start).String())
|
log.Debugw("Stored Miners Sector State", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
tx, err := p.db.Begin()
|
tx, err := p.db.Begin()
|
||||||
@ -412,7 +412,7 @@ func (p *Processor) storeMinersSectorState(miners []minerActorInfo) error {
|
|||||||
func (p *Processor) storeMinersSectorHeads(miners []minerActorInfo) error {
|
func (p *Processor) storeMinersSectorHeads(miners []minerActorInfo) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Stored Miners Sector Heads", "duration", time.Since(start).String())
|
log.Debugw("Stored Miners Sector Heads", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
tx, err := p.db.Begin()
|
tx, err := p.db.Begin()
|
||||||
@ -460,10 +460,10 @@ func (p *Processor) updateMiners(ctx context.Context, miners []minerActorInfo) e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActorInfo) error {
|
func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActorInfo) error {
|
||||||
log.Infow("Updating Miners Sectors", "#miners", len(miners))
|
log.Debugw("Updating Miners Sectors", "#miners", len(miners))
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Updated Miners Sectors", "duration", time.Since(start).String())
|
log.Debugw("Updated Miners Sectors", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
pred := state.NewStatePredicates(p.node)
|
pred := state.NewStatePredicates(p.node)
|
||||||
@ -530,7 +530,7 @@ func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActor
|
|||||||
minerID: m.common.addr,
|
minerID: m.common.addr,
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("sector extended", "miner", m.common.addr.String(), "sector", extended.To.Info.SectorNumber, "old", extended.To.Info.Expiration, "new", extended.From.Info.Expiration)
|
log.Debugw("sector extended", "miner", m.common.addr.String(), "sector", extended.To.Info.SectorNumber, "old", extended.To.Info.Expiration, "new", extended.From.Info.Expiration)
|
||||||
}
|
}
|
||||||
curTs, err := p.node.ChainGetTipSet(ctx, m.common.tsKey)
|
curTs, err := p.node.ChainGetTipSet(ctx, m.common.tsKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -538,13 +538,13 @@ func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActor
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, removed := range changes.Removed {
|
for _, removed := range changes.Removed {
|
||||||
log.Infow("removed", "miner", m.common.addr)
|
log.Debugw("removed", "miner", m.common.addr)
|
||||||
// decide if they were terminated or extended
|
// decide if they were terminated or extended
|
||||||
if removed.Info.Expiration > curTs.Height() {
|
if removed.Info.Expiration > curTs.Height() {
|
||||||
if _, err := eventStmt.Exec(removed.Info.SectorNumber, "TERMINATED", m.common.addr.String(), m.common.stateroot.String()); err != nil {
|
if _, err := eventStmt.Exec(removed.Info.SectorNumber, "TERMINATED", m.common.addr.String(), m.common.stateroot.String()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Infow("sector terminated", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "terminationEpoch", curTs.Height())
|
log.Debugw("sector terminated", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "terminationEpoch", curTs.Height())
|
||||||
sectorUpdatesCh <- sectorUpdate{
|
sectorUpdatesCh <- sectorUpdate{
|
||||||
terminationEpoch: curTs.Height(),
|
terminationEpoch: curTs.Height(),
|
||||||
terminated: true,
|
terminated: true,
|
||||||
@ -557,7 +557,7 @@ func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActor
|
|||||||
if _, err := eventStmt.Exec(removed.Info.SectorNumber, "EXPIRED", m.common.addr.String(), m.common.stateroot.String()); err != nil {
|
if _, err := eventStmt.Exec(removed.Info.SectorNumber, "EXPIRED", m.common.addr.String(), m.common.stateroot.String()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Infow("sector removed", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "currEpoch", curTs.Height())
|
log.Debugw("sector removed", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "currEpoch", curTs.Height())
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, added := range changes.Added {
|
for _, added := range changes.Added {
|
||||||
|
@ -81,7 +81,7 @@ func (p *Processor) setupSchemas() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Processor) Start(ctx context.Context) {
|
func (p *Processor) Start(ctx context.Context) {
|
||||||
log.Info("Starting Processor")
|
log.Debug("Starting Processor")
|
||||||
|
|
||||||
if err := p.setupSchemas(); err != nil {
|
if err := p.setupSchemas(); err != nil {
|
||||||
log.Fatalw("Failed to setup processor", "error", err)
|
log.Fatalw("Failed to setup processor", "error", err)
|
||||||
@ -94,7 +94,7 @@ func (p *Processor) Start(ctx context.Context) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.Infow("Stopping Processor...")
|
log.Debugw("Stopping Processor...")
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
toProcess, err := p.unprocessedBlocks(ctx, p.batch)
|
toProcess, err := p.unprocessedBlocks(ctx, p.batch)
|
||||||
@ -176,7 +176,7 @@ func (p *Processor) refreshViews() error {
|
|||||||
func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, error) {
|
func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Collected Actor Changes", "duration", time.Since(start).String())
|
log.Debugw("Collected Actor Changes", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
// ActorCode - > tipset->[]actorInfo
|
// ActorCode - > tipset->[]actorInfo
|
||||||
out := map[cid.Cid]ActorTips{}
|
out := map[cid.Cid]ActorTips{}
|
||||||
@ -191,7 +191,7 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
|
|||||||
parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) {
|
parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) {
|
||||||
paDone++
|
paDone++
|
||||||
if paDone%100 == 0 {
|
if paDone%100 == 0 {
|
||||||
log.Infow("Collecting actor changes", "done", paDone, "percent", (paDone*100)/len(toProcess))
|
log.Debugw("Collecting actor changes", "done", paDone, "percent", (paDone*100)/len(toProcess))
|
||||||
}
|
}
|
||||||
|
|
||||||
pts, err := p.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
|
pts, err := p.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
|
||||||
@ -255,7 +255,7 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
|
|||||||
func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) {
|
func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Gathered Blocks to process", "duration", time.Since(start).String())
|
log.Debugw("Gathered Blocks to process", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
rows, err := p.db.Query(`
|
rows, err := p.db.Query(`
|
||||||
with toProcess as (
|
with toProcess as (
|
||||||
@ -299,7 +299,7 @@ where rnk <= $1
|
|||||||
func (p *Processor) markBlocksProcessed(ctx context.Context, processed map[cid.Cid]*types.BlockHeader) error {
|
func (p *Processor) markBlocksProcessed(ctx context.Context, processed map[cid.Cid]*types.BlockHeader) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Marked blocks as Processed", "duration", time.Since(start).String())
|
log.Debugw("Marked blocks as Processed", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
tx, err := p.db.Begin()
|
tx, err := p.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -89,7 +89,7 @@ func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTip
|
|||||||
func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips) ([]rewardActorInfo, error) {
|
func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips) ([]rewardActorInfo, error) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Processed Reward Actors", "duration", time.Since(start).String())
|
log.Debugw("Processed Reward Actors", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var out []rewardActorInfo
|
var out []rewardActorInfo
|
||||||
@ -125,7 +125,7 @@ func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTip
|
|||||||
func (p *Processor) persistRewardActors(ctx context.Context, rewards []rewardActorInfo) error {
|
func (p *Processor) persistRewardActors(ctx context.Context, rewards []rewardActorInfo) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infow("Persisted Reward Actors", "duration", time.Since(start).String())
|
log.Debugw("Persisted Reward Actors", "duration", time.Since(start).String())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if err := p.storeChainPower(rewards); err != nil {
|
if err := p.storeChainPower(rewards); err != nil {
|
||||||
|
77
cmd/lotus-chainwatch/run.go
Normal file
77
cmd/lotus-chainwatch/run.go
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
lcli "github.com/filecoin-project/lotus/cli"
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
"github.com/urfave/cli/v2"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/processor"
|
||||||
|
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/syncer"
|
||||||
|
)
|
||||||
|
|
||||||
|
var runCmd = &cli.Command{
|
||||||
|
Name: "run",
|
||||||
|
Usage: "Start lotus chainwatch",
|
||||||
|
Flags: []cli.Flag{
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "front",
|
||||||
|
Value: "127.0.0.1:8418",
|
||||||
|
},
|
||||||
|
&cli.IntFlag{
|
||||||
|
Name: "max-batch",
|
||||||
|
Value: 1000,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Action: func(cctx *cli.Context) error {
|
||||||
|
ll := cctx.String("log-level")
|
||||||
|
if err := logging.SetLogLevel("*", ll); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer closer()
|
||||||
|
ctx := lcli.ReqContext(cctx)
|
||||||
|
|
||||||
|
v, err := api.Version(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Remote version: %s", v.Version)
|
||||||
|
|
||||||
|
maxBatch := cctx.Int("max-batch")
|
||||||
|
|
||||||
|
db, err := sql.Open("postgres", cctx.String("db"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := db.Close(); err != nil {
|
||||||
|
log.Errorw("Failed to close database", "error", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err := db.Ping(); err != nil {
|
||||||
|
return xerrors.Errorf("Database failed to respond to ping (is it online?): %w", err)
|
||||||
|
}
|
||||||
|
db.SetMaxOpenConns(1350)
|
||||||
|
|
||||||
|
sync := syncer.NewSyncer(db, api)
|
||||||
|
sync.Start(ctx)
|
||||||
|
|
||||||
|
proc := processor.NewProcessor(db, api, maxBatch)
|
||||||
|
proc.Start(ctx)
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
os.Exit(0)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
@ -133,7 +133,7 @@ create index if not exists state_heights_parentstateroot_index
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Syncer) Start(ctx context.Context) {
|
func (s *Syncer) Start(ctx context.Context) {
|
||||||
log.Info("Starting Syncer")
|
log.Debug("Starting Syncer")
|
||||||
|
|
||||||
if err := s.setupSchemas(); err != nil {
|
if err := s.setupSchemas(); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
@ -217,7 +217,7 @@ func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since t
|
|||||||
|
|
||||||
toSync[bh.Cid()] = bh
|
toSync[bh.Cid()] = bh
|
||||||
if len(toSync)%500 == 10 {
|
if len(toSync)%500 == 10 {
|
||||||
log.Infow("To visit", "toVisit", toVisit.Len(), "toSync", len(toSync), "current_height", bh.Height)
|
log.Debugw("To visit", "toVisit", toVisit.Len(), "toSync", len(toSync), "current_height", bh.Height)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(bh.Parents) == 0 {
|
if len(bh.Parents) == 0 {
|
||||||
|
Loading…
Reference in New Issue
Block a user