From c0bd13ecedf2e60008b13cf871f5139e80308c4b Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Wed, 30 Aug 2023 12:11:09 -0400 Subject: [PATCH] WIP --- node/modules/storageminer.go | 13 ++++++++++++ storage/wdpost/wdpost_changehandler.go | 18 +++++++++------- storage/wdpost/wdpost_sched.go | 29 +++++++++++++++----------- 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 533ef6c9b..6cc5d71ac 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "net/http" "os" "path/filepath" @@ -322,6 +323,18 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func return nil, err } + wdPostTask := wdpost.NewWdPostTask(db) + tasks := []harmonytask.TaskInterface{wdPostTask} + + taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300") + if err != nil { + return nil, xerrors.Errorf("failed to create task engine: %w", err) + } + //handler := gin.New() + // + //taskEngine.ApplyHttpHandlers(handler.Group("/")) + defer taskEngine.GracefullyTerminate(time.Hour) + lc.Append(fx.Hook{ OnStart: func(context.Context) error { go fps.Run(ctx) diff --git a/storage/wdpost/wdpost_changehandler.go b/storage/wdpost/wdpost_changehandler.go index 32baa6165..dd6ac0fb4 100644 --- a/storage/wdpost/wdpost_changehandler.go +++ b/storage/wdpost/wdpost_changehandler.go @@ -2,7 +2,6 @@ package wdpost import ( "context" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "sync" "github.com/filecoin-project/go-address" @@ -32,18 +31,25 @@ type wdPoStCommands interface { recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info) } +type changeHandlerIface interface { + start() + update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error + shutdown() + currentTSDI() (*types.TipSet, *dline.Info) +} + +var _ changeHandlerIface = &changeHandler{} + type changeHandler struct { api wdPoStCommands actor address.Address proveHdlr *proveHandler submitHdlr *submitHandler - - db *harmonydb.DB } -func newChangeHandler(api wdPoStCommands, actor address.Address, db *harmonydb.DB) *changeHandler { +func newChangeHandler(api wdPoStCommands, actor address.Address) *changeHandler { posts := newPostsCache() - p := newProver(api, posts, db) + p := newProver(api, posts) s := newSubmitter(api, posts) return &changeHandler{api: api, actor: actor, proveHdlr: p, submitHdlr: s} } @@ -172,7 +178,6 @@ type proveHandler struct { func newProver( api wdPoStCommands, posts *postsCache, - db *harmonydb.DB, ) *proveHandler { ctx, cancel := context.WithCancel(context.Background()) return &proveHandler{ @@ -182,7 +187,6 @@ func newProver( hcs: make(chan *headChange), shutdownCtx: ctx, shutdown: cancel, - wdPostTask: NewWdPostTask(db), } } diff --git a/storage/wdpost/wdpost_sched.go b/storage/wdpost/wdpost_sched.go index 0a8b5b6c7..cf04cf5af 100644 --- a/storage/wdpost/wdpost_sched.go +++ b/storage/wdpost/wdpost_sched.go @@ -77,8 +77,8 @@ type WindowPoStScheduler struct { maxPartitionsPerPostMessage int maxPartitionsPerRecoveryMessage int singleRecoveringPartitionPerPostMessage bool - ch *changeHandler - ch2 *changeHandler2 + ch changeHandlerIface + //ch2 *changeHandler2 actor address.Address @@ -140,13 +140,17 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) { *WindowPoStScheduler }{s.api, s} - s.ch = newChangeHandler(callbacks, s.actor, s.db) - defer s.ch.shutdown() - s.ch.start() + run_on_lotus_provider := true - s.ch2 = newChangeHandler2(callbacks, s.actor, s.db) - defer s.ch2.shutdown() - s.ch2.start() + if !run_on_lotus_provider { + s.ch = newChangeHandler(callbacks, s.actor) + defer s.ch.shutdown() + s.ch.start() + } else { + s.ch = newChangeHandler2(callbacks, s.actor, s.db) + defer s.ch.shutdown() + s.ch.start() + } var ( notifs <-chan []*api.HeadChange @@ -223,6 +227,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) { } func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.TipSet) { + log.Errorf("WindowPoStScheduler.update() called with revert: %v, apply: %v", revert, apply) if apply == nil { log.Error("no new tipset in window post WindowPoStScheduler.update") return @@ -232,10 +237,10 @@ func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.T log.Errorf("handling head updates in window post sched: %+v", err) } - err = s.ch2.update(ctx, revert, apply) - if err != nil { - log.Errorf("handling head updates in window post sched: %+v", err) - } + //err = s.ch2.update(ctx, revert, apply) + //if err != nil { + // log.Errorf("handling head updates in window post sched: %+v", err) + //} } // onAbort is called when generating proofs or submitting proofs is aborted