Merge pull request #11404 from filecoin-project/feat/lp-decl-recover
feat: wdpost: Declare-recovered task
This commit is contained in:
commit
ac0e75734f
@ -244,12 +244,12 @@ var runCmd = &cli.Command{
|
|||||||
{
|
{
|
||||||
|
|
||||||
if cfg.Subsystems.EnableWindowPost {
|
if cfg.Subsystems.EnableWindowPost {
|
||||||
wdPostTask, wdPoStSubmitTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
|
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
|
||||||
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
|
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask)
|
activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask, derlareRecoverTask)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taskEngine, err := harmonytask.New(db, activeTasks, listenAddr)
|
taskEngine, err := harmonytask.New(db, activeTasks, listenAddr)
|
||||||
|
@ -33,3 +33,16 @@ create table wdpost_proofs
|
|||||||
constraint wdpost_proofs_identity_key
|
constraint wdpost_proofs_identity_key
|
||||||
unique (sp_id, proving_period_start, deadline, partition)
|
unique (sp_id, proving_period_start, deadline, partition)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
create table wdpost_recovery_tasks
|
||||||
|
(
|
||||||
|
task_id bigint not null
|
||||||
|
constraint wdpost_partition_tasks_pk
|
||||||
|
primary key,
|
||||||
|
sp_id bigint not null,
|
||||||
|
proving_period_start bigint not null,
|
||||||
|
deadline_index bigint not null,
|
||||||
|
partition_index bigint not null,
|
||||||
|
constraint wdpost_partition_tasks_identity_key
|
||||||
|
unique (sp_id, proving_period_start, deadline_index, partition_index)
|
||||||
|
);
|
@ -23,7 +23,8 @@ var log = logging.Logger("provider")
|
|||||||
|
|
||||||
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
|
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
|
||||||
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker,
|
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker,
|
||||||
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, error) {
|
as *ctladdr.AddressSelector, addresses []dtypes.MinerAddress, db *harmonydb.DB,
|
||||||
|
stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, *lpwindow.WdPostRecoverDeclareTask, error) {
|
||||||
|
|
||||||
chainSched := chainsched.New(api)
|
chainSched := chainsched.New(api)
|
||||||
|
|
||||||
@ -32,17 +33,22 @@ func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc co
|
|||||||
|
|
||||||
sender := lpmessage.NewSender(api, api, db)
|
sender := lpmessage.NewSender(api, api, db)
|
||||||
|
|
||||||
computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max)
|
computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, addresses, max)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
submitTask, err := lpwindow.NewWdPostSubmitTask(chainSched, sender, db, api, fc.MaxWindowPoStGasFee, as)
|
submitTask, err := lpwindow.NewWdPostSubmitTask(chainSched, sender, db, api, fc.MaxWindowPoStGasFee, as)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
recoverTask, err := lpwindow.NewWdPostRecoverDeclareTask(sender, db, api, ft, as, chainSched, fc.MaxWindowPoStGasFee, addresses)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
go chainSched.Run(ctx)
|
go chainSched.Run(ctx)
|
||||||
|
|
||||||
return computeTask, submitTask, nil
|
return computeTask, submitTask, recoverTask, nil
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,8 @@ package lpwindow
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -92,7 +94,7 @@ func (t *WdPostTask) doPartition(ctx context.Context, ts *types.TipSet, maddr ad
|
|||||||
return nil, xerrors.Errorf("copy toProve: %w", err)
|
return nil, xerrors.Errorf("copy toProve: %w", err)
|
||||||
}
|
}
|
||||||
if !disablePreChecks {
|
if !disablePreChecks {
|
||||||
good, err = t.checkSectors(ctx, maddr, toProve, ts.Key())
|
good, err = checkSectors(ctx, t.api, t.faultTracker, maddr, toProve, ts.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
|
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
|
||||||
}
|
}
|
||||||
@ -222,13 +224,18 @@ func (t *WdPostTask) doPartition(ctx context.Context, ts *types.TipSet, maddr ad
|
|||||||
return nil, xerrors.Errorf("failed to generate window post")
|
return nil, xerrors.Errorf("failed to generate window post")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *WdPostTask) checkSectors(ctx context.Context, maddr address.Address, check bitfield.BitField, tsk types.TipSetKey) (bitfield.BitField, error) {
|
type CheckSectorsAPI interface {
|
||||||
|
StateMinerSectors(ctx context.Context, addr address.Address, bf *bitfield.BitField, tsk types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkSectors(ctx context.Context, api CheckSectorsAPI, ft sealer.FaultTracker,
|
||||||
|
maddr address.Address, check bitfield.BitField, tsk types.TipSetKey) (bitfield.BitField, error) {
|
||||||
mid, err := address.IDFromAddress(maddr)
|
mid, err := address.IDFromAddress(maddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bitfield.BitField{}, xerrors.Errorf("failed to convert to ID addr: %w", err)
|
return bitfield.BitField{}, xerrors.Errorf("failed to convert to ID addr: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sectorInfos, err := t.api.StateMinerSectors(ctx, maddr, &check, tsk)
|
sectorInfos, err := api.StateMinerSectors(ctx, maddr, &check, tsk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return bitfield.BitField{}, xerrors.Errorf("failed to get sector infos: %w", err)
|
return bitfield.BitField{}, xerrors.Errorf("failed to get sector infos: %w", err)
|
||||||
}
|
}
|
||||||
@ -267,7 +274,7 @@ func (t *WdPostTask) checkSectors(ctx context.Context, maddr address.Address, ch
|
|||||||
return bitfield.BitField{}, xerrors.Errorf("failed to convert to v1_1 post proof: %w", err)
|
return bitfield.BitField{}, xerrors.Errorf("failed to convert to v1_1 post proof: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bad, err := t.faultTracker.CheckProvable(ctx, pp, tocheck, func(ctx context.Context, id abi.SectorID) (cid.Cid, bool, error) {
|
bad, err := ft.CheckProvable(ctx, pp, tocheck, func(ctx context.Context, id abi.SectorID) (cid.Cid, bool, error) {
|
||||||
s, ok := sectors[id.Number]
|
s, ok := sectors[id.Number]
|
||||||
if !ok {
|
if !ok {
|
||||||
return cid.Undef, false, xerrors.Errorf("sealed CID not found")
|
return cid.Undef, false, xerrors.Errorf("sealed CID not found")
|
||||||
|
316
provider/lpwindow/recover_task.go
Normal file
316
provider/lpwindow/recover_task.go
Normal file
@ -0,0 +1,316 @@
|
|||||||
|
package lpwindow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-bitfield"
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/go-state-types/builtin"
|
||||||
|
"github.com/filecoin-project/go-state-types/dline"
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/resources"
|
||||||
|
"github.com/filecoin-project/lotus/lib/promise"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/filecoin-project/lotus/provider/chainsched"
|
||||||
|
"github.com/filecoin-project/lotus/provider/lpmessage"
|
||||||
|
"github.com/filecoin-project/lotus/storage/ctladdr"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer"
|
||||||
|
"github.com/filecoin-project/lotus/storage/wdpost"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WdPostRecoverDeclareTask struct {
|
||||||
|
sender *lpmessage.Sender
|
||||||
|
db *harmonydb.DB
|
||||||
|
api WdPostRecoverDeclareTaskApi
|
||||||
|
faultTracker sealer.FaultTracker
|
||||||
|
|
||||||
|
maxDeclareRecoveriesGasFee types.FIL
|
||||||
|
as *ctladdr.AddressSelector
|
||||||
|
actors []dtypes.MinerAddress
|
||||||
|
|
||||||
|
startCheckTF promise.Promise[harmonytask.AddTaskFunc]
|
||||||
|
}
|
||||||
|
|
||||||
|
type WdPostRecoverDeclareTaskApi interface {
|
||||||
|
ChainHead(context.Context) (*types.TipSet, error)
|
||||||
|
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
|
||||||
|
StateMinerPartitions(context.Context, address.Address, uint64, types.TipSetKey) ([]api.Partition, error)
|
||||||
|
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
|
||||||
|
StateMinerSectors(ctx context.Context, addr address.Address, bf *bitfield.BitField, tsk types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
|
||||||
|
|
||||||
|
GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error)
|
||||||
|
GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error)
|
||||||
|
GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error)
|
||||||
|
|
||||||
|
WalletBalance(context.Context, address.Address) (types.BigInt, error)
|
||||||
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
|
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
||||||
|
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWdPostRecoverDeclareTask(sender *lpmessage.Sender,
|
||||||
|
db *harmonydb.DB,
|
||||||
|
api WdPostRecoverDeclareTaskApi,
|
||||||
|
faultTracker sealer.FaultTracker,
|
||||||
|
as *ctladdr.AddressSelector,
|
||||||
|
pcs *chainsched.ProviderChainSched,
|
||||||
|
|
||||||
|
maxDeclareRecoveriesGasFee types.FIL,
|
||||||
|
actors []dtypes.MinerAddress) (*WdPostRecoverDeclareTask, error) {
|
||||||
|
t := &WdPostRecoverDeclareTask{
|
||||||
|
sender: sender,
|
||||||
|
db: db,
|
||||||
|
api: api,
|
||||||
|
faultTracker: faultTracker,
|
||||||
|
|
||||||
|
maxDeclareRecoveriesGasFee: maxDeclareRecoveriesGasFee,
|
||||||
|
as: as,
|
||||||
|
actors: actors,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := pcs.AddHandler(t.processHeadChange); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WdPostRecoverDeclareTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
||||||
|
log.Debugw("WdPostRecoverDeclareTask.Do()", "taskID", taskID)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
var spID, pps, dlIdx, partIdx uint64
|
||||||
|
|
||||||
|
err = w.db.QueryRow(context.Background(),
|
||||||
|
`Select sp_id, proving_period_start, deadline_index, partition_index
|
||||||
|
from wdpost_recovery_tasks
|
||||||
|
where task_id = $1`, taskID).Scan(
|
||||||
|
&spID, &pps, &dlIdx, &partIdx,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("WdPostRecoverDeclareTask.Do() failed to queryRow: %v", err)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
head, err := w.api.ChainHead(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("WdPostRecoverDeclareTask.Do() failed to get chain head: %v", err)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
deadline := wdpost.NewDeadlineInfo(abi.ChainEpoch(pps), dlIdx, head.Height())
|
||||||
|
|
||||||
|
if deadline.FaultCutoffPassed() {
|
||||||
|
log.Errorf("WdPostRecover removed stale task: %v %v", taskID, deadline)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
maddr, err := address.NewIDAddress(spID)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("WdPostTask.Do() failed to NewIDAddress: %v", err)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
partitions, err := w.api.StateMinerPartitions(context.Background(), maddr, dlIdx, head.Key())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("WdPostRecoverDeclareTask.Do() failed to get partitions: %v", err)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if partIdx >= uint64(len(partitions)) {
|
||||||
|
log.Errorf("WdPostRecoverDeclareTask.Do() failed to get partitions: partIdx >= len(partitions)")
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
partition := partitions[partIdx]
|
||||||
|
|
||||||
|
unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("subtracting recovered set from fault set: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
uc, err := unrecovered.Count()
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("counting unrecovered sectors: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if uc == 0 {
|
||||||
|
log.Warnw("nothing to declare recovered", "maddr", maddr, "deadline", deadline, "partition", partIdx)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
recovered, err := checkSectors(ctx, w.api, w.faultTracker, maddr, unrecovered, head.Key())
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("checking unrecovered sectors: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// if all sectors failed to recover, don't declare recoveries
|
||||||
|
recoveredCount, err := recovered.Count()
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("counting recovered sectors: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if recoveredCount == 0 {
|
||||||
|
log.Warnw("no sectors recovered", "maddr", maddr, "deadline", deadline, "partition", partIdx)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
recDecl := miner.RecoveryDeclaration{
|
||||||
|
Deadline: dlIdx,
|
||||||
|
Partition: uint64(partIdx),
|
||||||
|
Sectors: recovered,
|
||||||
|
}
|
||||||
|
|
||||||
|
params := &miner.DeclareFaultsRecoveredParams{
|
||||||
|
Recoveries: []miner.RecoveryDeclaration{recDecl},
|
||||||
|
}
|
||||||
|
|
||||||
|
enc, aerr := actors.SerializeParams(params)
|
||||||
|
if aerr != nil {
|
||||||
|
return false, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := &types.Message{
|
||||||
|
To: maddr,
|
||||||
|
Method: builtin.MethodsMiner.DeclareFaultsRecovered,
|
||||||
|
Params: enc,
|
||||||
|
Value: types.NewInt(0),
|
||||||
|
}
|
||||||
|
|
||||||
|
msg, mss, err := preparePoStMessage(w.api, w.as, maddr, msg, abi.TokenAmount(w.maxDeclareRecoveriesGasFee))
|
||||||
|
|
||||||
|
mc, err := w.sender.Send(ctx, msg, mss, "declare-recoveries")
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("sending declare recoveries message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugw("WdPostRecoverDeclareTask.Do() sent declare recoveries message", "maddr", maddr, "deadline", deadline, "partition", partIdx, "mc", mc)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WdPostRecoverDeclareTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
||||||
|
if len(ids) == 0 {
|
||||||
|
// probably can't happen, but panicking is bad
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if w.sender == nil {
|
||||||
|
// we can't send messages
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &ids[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails {
|
||||||
|
return harmonytask.TaskTypeDetails{
|
||||||
|
Max: 128,
|
||||||
|
Name: "WdPostRecoverDeclare",
|
||||||
|
Cost: resources.Resources{
|
||||||
|
Cpu: 1,
|
||||||
|
Gpu: 0,
|
||||||
|
Ram: 128 << 20,
|
||||||
|
},
|
||||||
|
MaxFailures: 10,
|
||||||
|
Follows: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WdPostRecoverDeclareTask) Adder(taskFunc harmonytask.AddTaskFunc) {
|
||||||
|
w.startCheckTF.Set(taskFunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WdPostRecoverDeclareTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error {
|
||||||
|
tf := w.startCheckTF.Val(ctx)
|
||||||
|
|
||||||
|
for _, act := range w.actors {
|
||||||
|
maddr := address.Address(act)
|
||||||
|
|
||||||
|
aid, err := address.IDFromAddress(maddr)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting miner ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
di, err := w.api.StateMinerProvingDeadline(ctx, maddr, apply.Key())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !di.PeriodStarted() {
|
||||||
|
return nil // not proving anything yet
|
||||||
|
}
|
||||||
|
|
||||||
|
// declaring two deadlines ahead
|
||||||
|
declDeadline := (di.Index + 2) % di.WPoStPeriodDeadlines
|
||||||
|
|
||||||
|
pps := di.PeriodStart
|
||||||
|
if declDeadline != di.Index+2 {
|
||||||
|
pps = di.NextPeriodStart()
|
||||||
|
}
|
||||||
|
|
||||||
|
partitions, err := w.api.StateMinerPartitions(ctx, maddr, declDeadline, apply.Key())
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting partitions: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for pidx, partition := range partitions {
|
||||||
|
unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("subtracting recovered set from fault set: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
uc, err := unrecovered.Count()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("counting unrecovered sectors: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if uc == 0 {
|
||||||
|
log.Debugw("WdPostRecoverDeclareTask.processHeadChange() uc == 0, skipping", "maddr", maddr, "declDeadline", declDeadline, "pidx", pidx)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
tid := wdTaskIdentity{
|
||||||
|
Sp_id: aid,
|
||||||
|
Proving_period_start: pps,
|
||||||
|
Deadline_index: declDeadline,
|
||||||
|
Partition_index: uint64(pidx),
|
||||||
|
}
|
||||||
|
|
||||||
|
tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
|
||||||
|
return w.addTaskToDB(id, tid, tx)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WdPostRecoverDeclareTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIdentity, tx *harmonydb.Tx) (bool, error) {
|
||||||
|
_, err := tx.Exec(
|
||||||
|
`INSERT INTO wdpost_recovery_tasks (
|
||||||
|
task_id,
|
||||||
|
sp_id,
|
||||||
|
proving_period_start,
|
||||||
|
deadline_index,
|
||||||
|
partition_index
|
||||||
|
) VALUES ($1, $2, $3, $4, $5)`,
|
||||||
|
taskId,
|
||||||
|
taskIdent.Sp_id,
|
||||||
|
taskIdent.Proving_period_start,
|
||||||
|
taskIdent.Deadline_index,
|
||||||
|
taskIdent.Partition_index,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("insert partition task: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ harmonytask.TaskInterface = &WdPostRecoverDeclareTask{}
|
@ -36,6 +36,7 @@ type WdPoStSubmitTaskApi interface {
|
|||||||
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
|
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
|
||||||
StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
|
StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
|
||||||
|
|
||||||
|
GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error)
|
||||||
GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error)
|
GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error)
|
||||||
GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error)
|
GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error)
|
||||||
}
|
}
|
||||||
@ -136,51 +137,16 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
|
|||||||
return false, xerrors.Errorf("invalid miner address: %w", err)
|
return false, xerrors.Errorf("invalid miner address: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mi, err := w.api.StateMinerInfo(context.Background(), maddr, types.EmptyTSK)
|
|
||||||
if err != nil {
|
|
||||||
return false, xerrors.Errorf("error getting miner info: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := &types.Message{
|
msg := &types.Message{
|
||||||
To: maddr,
|
To: maddr,
|
||||||
Method: builtin.MethodsMiner.SubmitWindowedPoSt,
|
Method: builtin.MethodsMiner.SubmitWindowedPoSt,
|
||||||
Params: pbuf.Bytes(),
|
Params: pbuf.Bytes(),
|
||||||
Value: big.Zero(),
|
Value: big.Zero(),
|
||||||
|
|
||||||
From: mi.Worker, // set worker for now for gas estimation
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// calculate a more frugal estimation; premium is estimated to guarantee
|
msg, mss, err := preparePoStMessage(w.api, w.as, maddr, msg, abi.TokenAmount(w.maxWindowPoStGasFee))
|
||||||
// inclusion within 5 tipsets, and fee cap is estimated for inclusion
|
|
||||||
// within 4 tipsets.
|
|
||||||
minGasFeeMsg := *msg
|
|
||||||
|
|
||||||
minGasFeeMsg.GasPremium, err = w.api.GasEstimateGasPremium(context.Background(), 5, msg.From, msg.GasLimit, types.EmptyTSK)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to estimate minimum gas premium: %+v", err)
|
return false, xerrors.Errorf("preparing proof message: %w", err)
|
||||||
minGasFeeMsg.GasPremium = msg.GasPremium
|
|
||||||
}
|
|
||||||
|
|
||||||
minGasFeeMsg.GasFeeCap, err = w.api.GasEstimateFeeCap(context.Background(), &minGasFeeMsg, 4, types.EmptyTSK)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to estimate minimum gas fee cap: %+v", err)
|
|
||||||
minGasFeeMsg.GasFeeCap = msg.GasFeeCap
|
|
||||||
}
|
|
||||||
|
|
||||||
// goodFunds = funds needed for optimal inclusion probability.
|
|
||||||
// minFunds = funds needed for more speculative inclusion probability.
|
|
||||||
goodFunds := big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value)
|
|
||||||
minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds)
|
|
||||||
|
|
||||||
from, _, err := w.as.AddressFor(context.Background(), w.api, mi, api.PoStAddr, goodFunds, minFunds)
|
|
||||||
if err != nil {
|
|
||||||
return false, xerrors.Errorf("error getting address: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
msg.From = from
|
|
||||||
|
|
||||||
mss := &api.MessageSendSpec{
|
|
||||||
MaxFee: abi.TokenAmount(w.maxWindowPoStGasFee),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
smsg, err := w.sender.Send(context.Background(), msg, mss, "wdpost")
|
smsg, err := w.sender.Send(context.Background(), msg, mss, "wdpost")
|
||||||
@ -269,4 +235,70 @@ func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MsgPrepAPI interface {
|
||||||
|
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
|
||||||
|
GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error)
|
||||||
|
GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error)
|
||||||
|
GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error)
|
||||||
|
|
||||||
|
WalletBalance(context.Context, address.Address) (types.BigInt, error)
|
||||||
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
|
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
||||||
|
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func preparePoStMessage(w MsgPrepAPI, as *ctladdr.AddressSelector, maddr address.Address, msg *types.Message, maxFee abi.TokenAmount) (*types.Message, *api.MessageSendSpec, error) {
|
||||||
|
mi, err := w.StateMinerInfo(context.Background(), maddr, types.EmptyTSK)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, xerrors.Errorf("error getting miner info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the worker as a fallback
|
||||||
|
msg.From = mi.Worker
|
||||||
|
|
||||||
|
mss := &api.MessageSendSpec{
|
||||||
|
MaxFee: abi.TokenAmount(maxFee),
|
||||||
|
}
|
||||||
|
|
||||||
|
// (optimal) initial estimation with some overestimation that guarantees
|
||||||
|
// block inclusion within the next 20 tipsets.
|
||||||
|
gm, err := w.GasEstimateMessageGas(context.Background(), msg, mss, types.EmptyTSK)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorw("estimating gas", "error", err)
|
||||||
|
return nil, nil, xerrors.Errorf("estimating gas: %w", err)
|
||||||
|
}
|
||||||
|
*msg = *gm
|
||||||
|
|
||||||
|
// calculate a more frugal estimation; premium is estimated to guarantee
|
||||||
|
// inclusion within 5 tipsets, and fee cap is estimated for inclusion
|
||||||
|
// within 4 tipsets.
|
||||||
|
minGasFeeMsg := *msg
|
||||||
|
|
||||||
|
minGasFeeMsg.GasPremium, err = w.GasEstimateGasPremium(context.Background(), 5, msg.From, msg.GasLimit, types.EmptyTSK)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to estimate minimum gas premium: %+v", err)
|
||||||
|
minGasFeeMsg.GasPremium = msg.GasPremium
|
||||||
|
}
|
||||||
|
|
||||||
|
minGasFeeMsg.GasFeeCap, err = w.GasEstimateFeeCap(context.Background(), &minGasFeeMsg, 4, types.EmptyTSK)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to estimate minimum gas fee cap: %+v", err)
|
||||||
|
minGasFeeMsg.GasFeeCap = msg.GasFeeCap
|
||||||
|
}
|
||||||
|
|
||||||
|
// goodFunds = funds needed for optimal inclusion probability.
|
||||||
|
// minFunds = funds needed for more speculative inclusion probability.
|
||||||
|
goodFunds := big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value)
|
||||||
|
minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds)
|
||||||
|
|
||||||
|
from, _, err := as.AddressFor(context.Background(), w, mi, api.PoStAddr, goodFunds, minFunds)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, xerrors.Errorf("error getting address: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.From = from
|
||||||
|
|
||||||
|
return msg, mss, nil
|
||||||
|
}
|
||||||
|
|
||||||
var _ harmonytask.TaskInterface = &WdPostSubmitTask{}
|
var _ harmonytask.TaskInterface = &WdPostSubmitTask{}
|
||||||
|
Loading…
Reference in New Issue
Block a user