Merge pull request #2542 from filecoin-project/feat/chainwatch/ticker-jobs
feat(chainwatch): Ticker triggers view refresh independent of processing
This commit is contained in:
commit
1d2901cb98
@ -14,6 +14,7 @@ import (
|
|||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/processor"
|
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/processor"
|
||||||
|
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/scheduler"
|
||||||
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/syncer"
|
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/syncer"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -75,6 +76,9 @@ var runCmd = &cli.Command{
|
|||||||
proc := processor.NewProcessor(db, api, maxBatch)
|
proc := processor.NewProcessor(db, api, maxBatch)
|
||||||
proc.Start(ctx)
|
proc.Start(ctx)
|
||||||
|
|
||||||
|
sched := scheduler.PrepareScheduler(db)
|
||||||
|
sched.Start(ctx)
|
||||||
|
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
return nil
|
return nil
|
||||||
|
@ -0,0 +1,29 @@
|
|||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func refreshTopMinerByBaseReward(ctx context.Context, db *sql.DB) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
t := time.Now()
|
||||||
|
defer func() {
|
||||||
|
log.Debugw("refresh top_miners_by_base_reward", "duration", time.Since(t).String())
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, err := db.Exec("REFRESH MATERIALIZED VIEW top_miners_by_base_reward;")
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("refresh top_miners_by_base_reward: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
47
cmd/lotus-chainwatch/scheduler/scheduler.go
Normal file
47
cmd/lotus-chainwatch/scheduler/scheduler.go
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("scheduler")
|
||||||
|
|
||||||
|
// Scheduler manages the execution of jobs triggered
|
||||||
|
// by tickers. Not externally configuable at runtime.
|
||||||
|
type Scheduler struct {
|
||||||
|
db *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
// PrepareScheduler returns a ready-to-run Scheduler
|
||||||
|
func PrepareScheduler(db *sql.DB) *Scheduler {
|
||||||
|
return &Scheduler{db}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the scheduler jobs at the defined intervals
|
||||||
|
func (s *Scheduler) Start(ctx context.Context) {
|
||||||
|
log.Debug("Starting Scheduler")
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// run once on start after schema has initialized
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
if err := refreshTopMinerByBaseReward(ctx, s.db); err != nil {
|
||||||
|
log.Errorf(err.Error())
|
||||||
|
}
|
||||||
|
refreshTopMinerCh := time.NewTicker(6 * time.Hour)
|
||||||
|
defer refreshTopMinerCh.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-refreshTopMinerCh.C:
|
||||||
|
if err := refreshTopMinerByBaseReward(ctx, s.db); err != nil {
|
||||||
|
log.Errorf(err.Error())
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user