diff --git a/cmd/lotus-chainwatch/run.go b/cmd/lotus-chainwatch/run.go index 5df81c26c..0ed08cf8d 100644 --- a/cmd/lotus-chainwatch/run.go +++ b/cmd/lotus-chainwatch/run.go @@ -14,6 +14,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/processor" + "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/scheduler" "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/syncer" ) @@ -75,6 +76,9 @@ var runCmd = &cli.Command{ proc := processor.NewProcessor(db, api, maxBatch) proc.Start(ctx) + sched := scheduler.PrepareScheduler(db) + sched.Start(ctx) + <-ctx.Done() os.Exit(0) return nil diff --git a/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go b/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go new file mode 100644 index 000000000..50bb561a1 --- /dev/null +++ b/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go @@ -0,0 +1,29 @@ +package scheduler + +import ( + "context" + "database/sql" + "time" + + "golang.org/x/xerrors" +) + +func refreshTopMinerByBaseReward(ctx context.Context, db *sql.DB) error { + select { + case <-ctx.Done(): + return nil + default: + } + + t := time.Now() + defer func() { + log.Debugw("refresh top_miners_by_base_reward", "duration", time.Since(t).String()) + }() + + _, err := db.Exec("REFRESH MATERIALIZED VIEW top_miners_by_base_reward;") + if err != nil { + return xerrors.Errorf("refresh top_miners_by_base_reward: %w", err) + } + + return nil +} diff --git a/cmd/lotus-chainwatch/scheduler/scheduler.go b/cmd/lotus-chainwatch/scheduler/scheduler.go new file mode 100644 index 000000000..c5c93c310 --- /dev/null +++ b/cmd/lotus-chainwatch/scheduler/scheduler.go @@ -0,0 +1,47 @@ +package scheduler + +import ( + "context" + "database/sql" + "time" + + logging "github.com/ipfs/go-log/v2" +) + +var log = logging.Logger("scheduler") + +// Scheduler manages the execution of jobs triggered +// by tickers. Not externally configuable at runtime. +type Scheduler struct { + db *sql.DB +} + +// PrepareScheduler returns a ready-to-run Scheduler +func PrepareScheduler(db *sql.DB) *Scheduler { + return &Scheduler{db} +} + +// Start the scheduler jobs at the defined intervals +func (s *Scheduler) Start(ctx context.Context) { + log.Debug("Starting Scheduler") + + go func() { + // run once on start after schema has initialized + time.Sleep(5 * time.Second) + if err := refreshTopMinerByBaseReward(ctx, s.db); err != nil { + log.Errorf(err.Error()) + } + refreshTopMinerCh := time.NewTicker(6 * time.Hour) + defer refreshTopMinerCh.Stop() + for { + select { + case <-refreshTopMinerCh.C: + if err := refreshTopMinerByBaseReward(ctx, s.db); err != nil { + log.Errorf(err.Error()) + } + case <-ctx.Done(): + return + } + } + }() +}