lpwindow: Wire up the recover task

This commit is contained in:
Łukasz Magiera 2023-11-09 17:32:32 +01:00
parent 3f100a45c8
commit 4391a5c12a
3 changed files with 41 additions and 7 deletions

View File

@ -244,12 +244,12 @@ var runCmd = &cli.Command{
{
if cfg.Subsystems.EnableWindowPost {
wdPostTask, wdPoStSubmitTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
if err != nil {
return err
}
activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask)
activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask, derlareRecoverTask)
}
}
taskEngine, err := harmonytask.New(db, activeTasks, listenAddr)

View File

@ -23,7 +23,8 @@ var log = logging.Logger("provider")
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker,
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, error) {
as *ctladdr.AddressSelector, addresses []dtypes.MinerAddress, db *harmonydb.DB,
stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, *lpwindow.WdPostRecoverDeclareTask, error) {
chainSched := chainsched.New(api)
@ -32,17 +33,22 @@ func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc co
sender := lpmessage.NewSender(api, api, db)
computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max)
computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, addresses, max)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
submitTask, err := lpwindow.NewWdPostSubmitTask(chainSched, sender, db, api, fc.MaxWindowPoStGasFee, as)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
recoverTask, err := lpwindow.NewWdPostRecoverDeclareTask(sender, db, api, ft, as, chainSched, fc.MaxWindowPoStGasFee, addresses)
if err != nil {
return nil, nil, nil, err
}
go chainSched.Run(ctx)
return computeTask, submitTask, nil
return computeTask, submitTask, recoverTask, nil
}

View File

@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/promise"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/provider/chainsched"
"github.com/filecoin-project/lotus/provider/lpmessage"
"github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/sealer"
@ -53,6 +54,33 @@ type WdPostRecoverDeclareTaskApi interface {
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
}
func NewWdPostRecoverDeclareTask(sender *lpmessage.Sender,
db *harmonydb.DB,
api WdPostRecoverDeclareTaskApi,
faultTracker sealer.FaultTracker,
as *ctladdr.AddressSelector,
pcs *chainsched.ProviderChainSched,
maxDeclareRecoveriesGasFee types.FIL,
actors []dtypes.MinerAddress) (*WdPostRecoverDeclareTask, error) {
t := &WdPostRecoverDeclareTask{
sender: sender,
db: db,
api: api,
faultTracker: faultTracker,
maxDeclareRecoveriesGasFee: maxDeclareRecoveriesGasFee,
as: as,
actors: actors,
}
if err := pcs.AddHandler(t.processHeadChange); err != nil {
return nil, err
}
return t, nil
}
func (w *WdPostRecoverDeclareTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
log.Debugw("WdPostRecoverDeclareTask.Do()", "taskID", taskID)
ctx := context.Background()