lpseal: Wire up SubmitPrecommit

This commit is contained in:
Łukasz Magiera 2024-01-05 16:10:34 +01:00
parent 2f45646a92
commit 0244c7df50
11 changed files with 71 additions and 27 deletions

View File

@ -172,7 +172,7 @@ It will not send any messages to the chain. Since it can compute any deadline, o
return err
}
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.Cfg.Fees, deps.Cfg.Proving, deps.Full, deps.Verif, deps.LW, nil,
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.Cfg.Fees, deps.Cfg.Proving, deps.Full, deps.Verif, deps.LW, nil, nil,
deps.As, deps.Maddrs, deps.DB, deps.Stor, deps.Si, deps.Cfg.Subsystems.WindowPostMaxTasks)
if err != nil {
return err

View File

@ -3,6 +3,7 @@ package tasks
import (
"context"
"github.com/filecoin-project/lotus/provider/chainsched"
"github.com/filecoin-project/lotus/provider/lpffi"
"github.com/filecoin-project/lotus/provider/lpseal"
@ -34,6 +35,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
sender, sendTask := lpmessage.NewSender(full, full, db)
activeTasks = append(activeTasks, sendTask)
chainSched := chainsched.New(full)
///////////////////////////////////////////////////////////////////////
///// Task Selection
///////////////////////////////////////////////////////////////////////
@ -42,7 +45,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
if cfg.Subsystems.EnableWindowPost {
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender,
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
chainSched, as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
if err != nil {
return nil, err
}
@ -55,9 +58,9 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
}
}
hasAnySealingTask := cfg.Subsystems.EnableSealSDR
{
// Sealing
hasAnySealingTask := cfg.Subsystems.EnableSealSDR
var sp *lpseal.SealPoller
var slr *lpffi.SealCalls
@ -76,10 +79,31 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
treesTask := lpseal.NewTreesTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks)
activeTasks = append(activeTasks, treesTask)
}
if cfg.Subsystems.EnableSendPrecommitMsg {
precommitTask := lpseal.NewSubmitPrecommitTask(sp, db, full, sender, cfg.Fees.MaxPreCommitGasFee)
activeTasks = append(activeTasks, precommitTask)
}
}
log.Infow("This lotus_provider instance handles",
"miner_addresses", maddrs,
"tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name }))
return harmonytask.New(db, activeTasks, dependencies.ListenAddr)
ht, err := harmonytask.New(db, activeTasks, dependencies.ListenAddr)
if err != nil {
return nil, err
}
if hasAnySealingTask {
watcher, err := lpmessage.NewMessageWatcher(db, ht, chainSched, full)
if err != nil {
return nil, err
}
_ = watcher
}
if cfg.Subsystems.EnableWindowPost || hasAnySealingTask {
go chainSched.Run(ctx)
}
return ht, nil
}

View File

@ -1,5 +1,5 @@
create table message_waits (
signed_message_cid text primary key references message_sends (signed_cid),
signed_message_cid text primary key,
waiter_machine_id int references harmony_machines (id) on delete set null,
executed_tsk_cid text,

View File

@ -114,6 +114,10 @@ type ProviderSubsystemsConfig struct {
// In lotus-miner this was run as part of PreCommit2 (TreeD was run in PreCommit1).
EnableSealSDRTrees bool
SealSDRTreesMaxTasks int
// EnableSendPrecommitMsg enables the sending of precommit messages to the chain
// from this lotus-provider instance.
EnableSendPrecommitMsg bool
}
type DAGStoreConfig struct {

View File

@ -20,12 +20,10 @@ import (
//var log = logging.Logger("provider")
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, sender *lpmessage.Sender,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, sender *lpmessage.Sender, chainSched *chainsched.ProviderChainSched,
as *multictladdr.MultiAddressSelector, addresses map[dtypes.MinerAddress]bool, db *harmonydb.DB,
stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, *lpwindow.WdPostRecoverDeclareTask, error) {
chainSched := chainsched.New(api)
// todo config
ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second)
@ -44,7 +42,5 @@ func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc co
return nil, nil, nil, err
}
go chainSched.Run(ctx)
return computeTask, submitTask, recoverTask, nil
}

View File

@ -24,7 +24,7 @@ type MessageWaiterApi interface {
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
}
type MessageWaiter struct {
type MessageWatcher struct {
db *harmonydb.DB
ht *harmonytask.TaskEngine
api MessageWaiterApi
@ -35,8 +35,8 @@ type MessageWaiter struct {
bestTs atomic.Pointer[types.TipSetKey]
}
func NewMessageWaiter(db *harmonydb.DB, ht *harmonytask.TaskEngine, pcs *chainsched.ProviderChainSched, api MessageWaiterApi) (*MessageWaiter, error) {
mw := &MessageWaiter{
func NewMessageWatcher(db *harmonydb.DB, ht *harmonytask.TaskEngine, pcs *chainsched.ProviderChainSched, api MessageWaiterApi) (*MessageWatcher, error) {
mw := &MessageWatcher{
db: db,
ht: ht,
api: api,
@ -51,7 +51,7 @@ func NewMessageWaiter(db *harmonydb.DB, ht *harmonytask.TaskEngine, pcs *chainsc
return mw, nil
}
func (mw *MessageWaiter) run() {
func (mw *MessageWatcher) run() {
defer close(mw.stopped)
for {
@ -65,7 +65,7 @@ func (mw *MessageWaiter) run() {
}
}
func (mw *MessageWaiter) update() {
func (mw *MessageWatcher) update() {
ctx := context.Background()
tsk := *mw.bestTs.Load()
@ -189,7 +189,7 @@ func (mw *MessageWaiter) update() {
}
}
func (mw *MessageWaiter) Stop(ctx context.Context) error {
func (mw *MessageWatcher) Stop(ctx context.Context) error {
close(mw.stopping)
select {
case <-mw.stopped:
@ -200,7 +200,7 @@ func (mw *MessageWaiter) Stop(ctx context.Context) error {
return nil
}
func (mw *MessageWaiter) processHeadChange(ctx context.Context, revert *types.TipSet, apply *types.TipSet) error {
func (mw *MessageWatcher) processHeadChange(ctx context.Context, revert *types.TipSet, apply *types.TipSet) error {
best := apply.Key()
mw.bestTs.Store(&best)
select {

View File

@ -70,7 +70,6 @@ func (s *SealPoller) poll(ctx context.Context) error {
TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"`
AfterPrecommitMsg bool `db:"after_precommit_msg"`
TaskPrecommitMsgWait *int64 `db:"task_id_precommit_msg_wait"`
AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"`
TaskPoRep *int64 `db:"task_id_porep"`
@ -93,7 +92,7 @@ func (s *SealPoller) poll(ctx context.Context) error {
task_id_tree_c, after_tree_c,
task_id_tree_r, after_tree_r,
task_id_precommit_msg, after_precommit_msg,
task_id_precommit_msg_wait, after_precommit_msg_success,
after_precommit_msg_success,
task_id_porep, porep_proof,
task_id_commit_msg, after_commit_msg,
task_id_commit_msg_wait, after_commit_msg_success,
@ -150,7 +149,11 @@ func (s *SealPoller) poll(ctx context.Context) error {
})
}
if task.TaskPrecommitMsgWait == nil && task.AfterPrecommitMsg {
if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsg {
}
if task.AfterPrecommitMsg {
// todo start precommit msg wait task
}

View File

@ -32,6 +32,17 @@ type SubmitPrecommitTask struct {
maxFee types.FIL
}
func NewSubmitPrecommitTask(sp *SealPoller, db *harmonydb.DB, api SubmitPrecommitTaskApi, sender *lpmessage.Sender, maxFee types.FIL) *SubmitPrecommitTask {
return &SubmitPrecommitTask{
sp: sp,
db: db,
api: api,
sender: sender,
maxFee: maxFee,
}
}
func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.Background()

View File

@ -102,9 +102,11 @@ func NewWdPostTask(db *harmonydb.DB,
max: max,
}
if pcs != nil {
if err := pcs.AddHandler(t.processHeadChange); err != nil {
return nil, err
}
}
return t, nil
}

View File

@ -77,9 +77,11 @@ func NewWdPostRecoverDeclareTask(sender *lpmessage.Sender,
actors: actors,
}
if pcs != nil {
if err := pcs.AddHandler(t.processHeadChange); err != nil {
return nil, err
}
}
return t, nil
}

View File

@ -62,9 +62,11 @@ func NewWdPostSubmitTask(pcs *chainsched.ProviderChainSched, send *lpmessage.Sen
as: as,
}
if pcs != nil {
if err := pcs.AddHandler(res.processHeadChange); err != nil {
return nil, err
}
}
return res, nil
}