diff --git a/cmd/lotus-provider/proving.go b/cmd/lotus-provider/proving.go index eaef45db7..cd955011b 100644 --- a/cmd/lotus-provider/proving.go +++ b/cmd/lotus-provider/proving.go @@ -172,7 +172,7 @@ It will not send any messages to the chain. Since it can compute any deadline, o return err } - wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.Cfg.Fees, deps.Cfg.Proving, deps.Full, deps.Verif, deps.LW, nil, + wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.Cfg.Fees, deps.Cfg.Proving, deps.Full, deps.Verif, deps.LW, nil, nil, deps.As, deps.Maddrs, deps.DB, deps.Stor, deps.Si, deps.Cfg.Subsystems.WindowPostMaxTasks) if err != nil { return err diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go index b81814423..1bf96431c 100644 --- a/cmd/lotus-provider/tasks/tasks.go +++ b/cmd/lotus-provider/tasks/tasks.go @@ -3,6 +3,7 @@ package tasks import ( "context" + "github.com/filecoin-project/lotus/provider/chainsched" "github.com/filecoin-project/lotus/provider/lpffi" "github.com/filecoin-project/lotus/provider/lpseal" @@ -34,6 +35,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task sender, sendTask := lpmessage.NewSender(full, full, db) activeTasks = append(activeTasks, sendTask) + chainSched := chainsched.New(full) + /////////////////////////////////////////////////////////////////////// ///// Task Selection /////////////////////////////////////////////////////////////////////// @@ -42,7 +45,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task if cfg.Subsystems.EnableWindowPost { wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender, - as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) + chainSched, as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) if err != nil { return nil, err } @@ -55,9 +58,9 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task } } + hasAnySealingTask := cfg.Subsystems.EnableSealSDR { // Sealing - hasAnySealingTask := cfg.Subsystems.EnableSealSDR var sp *lpseal.SealPoller var slr *lpffi.SealCalls @@ -76,10 +79,31 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task treesTask := lpseal.NewTreesTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks) activeTasks = append(activeTasks, treesTask) } + if cfg.Subsystems.EnableSendPrecommitMsg { + precommitTask := lpseal.NewSubmitPrecommitTask(sp, db, full, sender, cfg.Fees.MaxPreCommitGasFee) + activeTasks = append(activeTasks, precommitTask) + } } log.Infow("This lotus_provider instance handles", "miner_addresses", maddrs, "tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name })) - return harmonytask.New(db, activeTasks, dependencies.ListenAddr) + ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr) + if err != nil { + return nil, err + } + + if hasAnySealingTask { + watcher, err := lpmessage.NewMessageWatcher(db, ht, chainSched, full) + if err != nil { + return nil, err + } + _ = watcher + } + + if cfg.Subsystems.EnableWindowPost || hasAnySealingTask { + go chainSched.Run(ctx) + } + + return ht, nil } diff --git a/lib/harmony/harmonydb/sql/20231225-message-waits.sql b/lib/harmony/harmonydb/sql/20231225-message-waits.sql index f3cfc1b6d..4143f3a56 100644 --- a/lib/harmony/harmonydb/sql/20231225-message-waits.sql +++ b/lib/harmony/harmonydb/sql/20231225-message-waits.sql @@ -1,5 +1,5 @@ create table message_waits ( - signed_message_cid text primary key references message_sends (signed_cid), + signed_message_cid text primary key, waiter_machine_id int references harmony_machines (id) on delete set null, executed_tsk_cid text, diff --git a/node/config/types.go b/node/config/types.go index e91d1dcf5..e2924de89 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -114,6 +114,10 @@ type ProviderSubsystemsConfig struct { // In lotus-miner this was run as part of PreCommit2 (TreeD was run in PreCommit1). EnableSealSDRTrees bool SealSDRTreesMaxTasks int + + // EnableSendPrecommitMsg enables the sending of precommit messages to the chain + // from this lotus-provider instance. + EnableSendPrecommitMsg bool } type DAGStoreConfig struct { diff --git a/provider/builder.go b/provider/builder.go index cff387970..d49333965 100644 --- a/provider/builder.go +++ b/provider/builder.go @@ -20,12 +20,10 @@ import ( //var log = logging.Logger("provider") func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig, - api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, sender *lpmessage.Sender, + api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, sender *lpmessage.Sender, chainSched *chainsched.ProviderChainSched, as *multictladdr.MultiAddressSelector, addresses map[dtypes.MinerAddress]bool, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, *lpwindow.WdPostRecoverDeclareTask, error) { - chainSched := chainsched.New(api) - // todo config ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second) @@ -44,7 +42,5 @@ func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc co return nil, nil, nil, err } - go chainSched.Run(ctx) - return computeTask, submitTask, recoverTask, nil } diff --git a/provider/lpmessage/watch.go b/provider/lpmessage/watch.go index 5f98a9a0c..ee8d33237 100644 --- a/provider/lpmessage/watch.go +++ b/provider/lpmessage/watch.go @@ -24,7 +24,7 @@ type MessageWaiterApi interface { ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) } -type MessageWaiter struct { +type MessageWatcher struct { db *harmonydb.DB ht *harmonytask.TaskEngine api MessageWaiterApi @@ -35,8 +35,8 @@ type MessageWaiter struct { bestTs atomic.Pointer[types.TipSetKey] } -func NewMessageWaiter(db *harmonydb.DB, ht *harmonytask.TaskEngine, pcs *chainsched.ProviderChainSched, api MessageWaiterApi) (*MessageWaiter, error) { - mw := &MessageWaiter{ +func NewMessageWatcher(db *harmonydb.DB, ht *harmonytask.TaskEngine, pcs *chainsched.ProviderChainSched, api MessageWaiterApi) (*MessageWatcher, error) { + mw := &MessageWatcher{ db: db, ht: ht, api: api, @@ -51,7 +51,7 @@ func NewMessageWaiter(db *harmonydb.DB, ht *harmonytask.TaskEngine, pcs *chainsc return mw, nil } -func (mw *MessageWaiter) run() { +func (mw *MessageWatcher) run() { defer close(mw.stopped) for { @@ -65,7 +65,7 @@ func (mw *MessageWaiter) run() { } } -func (mw *MessageWaiter) update() { +func (mw *MessageWatcher) update() { ctx := context.Background() tsk := *mw.bestTs.Load() @@ -189,7 +189,7 @@ func (mw *MessageWaiter) update() { } } -func (mw *MessageWaiter) Stop(ctx context.Context) error { +func (mw *MessageWatcher) Stop(ctx context.Context) error { close(mw.stopping) select { case <-mw.stopped: @@ -200,7 +200,7 @@ func (mw *MessageWaiter) Stop(ctx context.Context) error { return nil } -func (mw *MessageWaiter) processHeadChange(ctx context.Context, revert *types.TipSet, apply *types.TipSet) error { +func (mw *MessageWatcher) processHeadChange(ctx context.Context, revert *types.TipSet, apply *types.TipSet) error { best := apply.Key() mw.bestTs.Store(&best) select { diff --git a/provider/lpseal/poller.go b/provider/lpseal/poller.go index 2d2b035a1..f1ba84728 100644 --- a/provider/lpseal/poller.go +++ b/provider/lpseal/poller.go @@ -70,8 +70,7 @@ func (s *SealPoller) poll(ctx context.Context) error { TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` AfterPrecommitMsg bool `db:"after_precommit_msg"` - TaskPrecommitMsgWait *int64 `db:"task_id_precommit_msg_wait"` - AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` + AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"` TaskPoRep *int64 `db:"task_id_porep"` PoRepProof []byte `db:"porep_proof"` @@ -93,7 +92,7 @@ func (s *SealPoller) poll(ctx context.Context) error { task_id_tree_c, after_tree_c, task_id_tree_r, after_tree_r, task_id_precommit_msg, after_precommit_msg, - task_id_precommit_msg_wait, after_precommit_msg_success, + after_precommit_msg_success, task_id_porep, porep_proof, task_id_commit_msg, after_commit_msg, task_id_commit_msg_wait, after_commit_msg_success, @@ -150,7 +149,11 @@ func (s *SealPoller) poll(ctx context.Context) error { }) } - if task.TaskPrecommitMsgWait == nil && task.AfterPrecommitMsg { + if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsg { + + } + + if task.AfterPrecommitMsg { // todo start precommit msg wait task } diff --git a/provider/lpseal/task_submit_precommit.go b/provider/lpseal/task_submit_precommit.go index 198d53e43..6b926f8f5 100644 --- a/provider/lpseal/task_submit_precommit.go +++ b/provider/lpseal/task_submit_precommit.go @@ -32,6 +32,17 @@ type SubmitPrecommitTask struct { maxFee types.FIL } +func NewSubmitPrecommitTask(sp *SealPoller, db *harmonydb.DB, api SubmitPrecommitTaskApi, sender *lpmessage.Sender, maxFee types.FIL) *SubmitPrecommitTask { + return &SubmitPrecommitTask{ + sp: sp, + db: db, + api: api, + sender: sender, + + maxFee: maxFee, + } +} + func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { ctx := context.Background() diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index 9fc4afe4f..08f99e3e9 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -102,8 +102,10 @@ func NewWdPostTask(db *harmonydb.DB, max: max, } - if err := pcs.AddHandler(t.processHeadChange); err != nil { - return nil, err + if pcs != nil { + if err := pcs.AddHandler(t.processHeadChange); err != nil { + return nil, err + } } return t, nil diff --git a/provider/lpwindow/recover_task.go b/provider/lpwindow/recover_task.go index 076ed51c1..27c501dde 100644 --- a/provider/lpwindow/recover_task.go +++ b/provider/lpwindow/recover_task.go @@ -77,8 +77,10 @@ func NewWdPostRecoverDeclareTask(sender *lpmessage.Sender, actors: actors, } - if err := pcs.AddHandler(t.processHeadChange); err != nil { - return nil, err + if pcs != nil { + if err := pcs.AddHandler(t.processHeadChange); err != nil { + return nil, err + } } return t, nil diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index d6937354b..8d39c40ab 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -62,8 +62,10 @@ func NewWdPostSubmitTask(pcs *chainsched.ProviderChainSched, send *lpmessage.Sen as: as, } - if err := pcs.AddHandler(res.processHeadChange); err != nil { - return nil, err + if pcs != nil { + if err := pcs.AddHandler(res.processHeadChange); err != nil { + return nil, err + } } return res, nil