diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index 6967a2b44..38ebc57e3 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -244,12 +244,12 @@ var runCmd = &cli.Command{ { if cfg.Subsystems.EnableWindowPost { - wdPostTask, wdPoStSubmitTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, + wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) if err != nil { return err } - activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask) + activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask, derlareRecoverTask) } } taskEngine, err := harmonytask.New(db, activeTasks, listenAddr) diff --git a/lib/harmony/harmonydb/sql/20230823.sql b/lib/harmony/harmonydb/sql/20230823.sql index 750b7ea25..1865f6afe 100644 --- a/lib/harmony/harmonydb/sql/20230823.sql +++ b/lib/harmony/harmonydb/sql/20230823.sql @@ -33,3 +33,16 @@ create table wdpost_proofs constraint wdpost_proofs_identity_key unique (sp_id, proving_period_start, deadline, partition) ); + +create table wdpost_recovery_tasks +( + task_id bigint not null + constraint wdpost_partition_tasks_pk + primary key, + sp_id bigint not null, + proving_period_start bigint not null, + deadline_index bigint not null, + partition_index bigint not null, + constraint wdpost_partition_tasks_identity_key + unique (sp_id, proving_period_start, deadline_index, partition_index) +); \ No newline at end of file diff --git a/provider/builder.go b/provider/builder.go index 37b4afae9..928eaaac7 100644 --- a/provider/builder.go +++ b/provider/builder.go @@ -23,7 +23,8 @@ var log = logging.Logger("provider") func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig, api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, - as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, error) { + as *ctladdr.AddressSelector, addresses []dtypes.MinerAddress, db *harmonydb.DB, + stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, *lpwindow.WdPostRecoverDeclareTask, error) { chainSched := chainsched.New(api) @@ -32,17 +33,22 @@ func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc co sender := lpmessage.NewSender(api, api, db) - computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max) + computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, addresses, max) if err != nil { - return nil, nil, err + return nil, nil, nil, err } submitTask, err := lpwindow.NewWdPostSubmitTask(chainSched, sender, db, api, fc.MaxWindowPoStGasFee, as) if err != nil { - return nil, nil, err + return nil, nil, nil, err + } + + recoverTask, err := lpwindow.NewWdPostRecoverDeclareTask(sender, db, api, ft, as, chainSched, fc.MaxWindowPoStGasFee, addresses) + if err != nil { + return nil, nil, nil, err } go chainSched.Run(ctx) - return computeTask, submitTask, nil + return computeTask, submitTask, recoverTask, nil } diff --git a/provider/lpwindow/compute_do.go b/provider/lpwindow/compute_do.go index 1b4d440a0..d36541e8e 100644 --- a/provider/lpwindow/compute_do.go +++ b/provider/lpwindow/compute_do.go @@ -3,6 +3,8 @@ package lpwindow import ( "bytes" "context" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/storage/sealer" "sort" "sync" "time" @@ -92,7 +94,7 @@ func (t *WdPostTask) doPartition(ctx context.Context, ts *types.TipSet, maddr ad return nil, xerrors.Errorf("copy toProve: %w", err) } if !disablePreChecks { - good, err = t.checkSectors(ctx, maddr, toProve, ts.Key()) + good, err = checkSectors(ctx, t.api, t.faultTracker, maddr, toProve, ts.Key()) if err != nil { return nil, xerrors.Errorf("checking sectors to skip: %w", err) } @@ -222,13 +224,18 @@ func (t *WdPostTask) doPartition(ctx context.Context, ts *types.TipSet, maddr ad return nil, xerrors.Errorf("failed to generate window post") } -func (t *WdPostTask) checkSectors(ctx context.Context, maddr address.Address, check bitfield.BitField, tsk types.TipSetKey) (bitfield.BitField, error) { +type CheckSectorsAPI interface { + StateMinerSectors(ctx context.Context, addr address.Address, bf *bitfield.BitField, tsk types.TipSetKey) ([]*miner.SectorOnChainInfo, error) +} + +func checkSectors(ctx context.Context, api CheckSectorsAPI, ft sealer.FaultTracker, + maddr address.Address, check bitfield.BitField, tsk types.TipSetKey) (bitfield.BitField, error) { mid, err := address.IDFromAddress(maddr) if err != nil { return bitfield.BitField{}, xerrors.Errorf("failed to convert to ID addr: %w", err) } - sectorInfos, err := t.api.StateMinerSectors(ctx, maddr, &check, tsk) + sectorInfos, err := api.StateMinerSectors(ctx, maddr, &check, tsk) if err != nil { return bitfield.BitField{}, xerrors.Errorf("failed to get sector infos: %w", err) } @@ -267,7 +274,7 @@ func (t *WdPostTask) checkSectors(ctx context.Context, maddr address.Address, ch return bitfield.BitField{}, xerrors.Errorf("failed to convert to v1_1 post proof: %w", err) } - bad, err := t.faultTracker.CheckProvable(ctx, pp, tocheck, func(ctx context.Context, id abi.SectorID) (cid.Cid, bool, error) { + bad, err := ft.CheckProvable(ctx, pp, tocheck, func(ctx context.Context, id abi.SectorID) (cid.Cid, bool, error) { s, ok := sectors[id.Number] if !ok { return cid.Undef, false, xerrors.Errorf("sealed CID not found") diff --git a/provider/lpwindow/recover_task.go b/provider/lpwindow/recover_task.go new file mode 100644 index 000000000..d50fddc0e --- /dev/null +++ b/provider/lpwindow/recover_task.go @@ -0,0 +1,316 @@ +package lpwindow + +import ( + "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/builtin" + "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/promise" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/provider/chainsched" + "github.com/filecoin-project/lotus/provider/lpmessage" + "github.com/filecoin-project/lotus/storage/ctladdr" + "github.com/filecoin-project/lotus/storage/sealer" + "github.com/filecoin-project/lotus/storage/wdpost" + "golang.org/x/xerrors" +) + +type WdPostRecoverDeclareTask struct { + sender *lpmessage.Sender + db *harmonydb.DB + api WdPostRecoverDeclareTaskApi + faultTracker sealer.FaultTracker + + maxDeclareRecoveriesGasFee types.FIL + as *ctladdr.AddressSelector + actors []dtypes.MinerAddress + + startCheckTF promise.Promise[harmonytask.AddTaskFunc] +} + +type WdPostRecoverDeclareTaskApi interface { + ChainHead(context.Context) (*types.TipSet, error) + StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) + StateMinerPartitions(context.Context, address.Address, uint64, types.TipSetKey) ([]api.Partition, error) + StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) + StateMinerSectors(ctx context.Context, addr address.Address, bf *bitfield.BitField, tsk types.TipSetKey) ([]*miner.SectorOnChainInfo, error) + + GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error) + GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error) + GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) + + WalletBalance(context.Context, address.Address) (types.BigInt, error) + WalletHas(context.Context, address.Address) (bool, error) + StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) + StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) +} + +func NewWdPostRecoverDeclareTask(sender *lpmessage.Sender, + db *harmonydb.DB, + api WdPostRecoverDeclareTaskApi, + faultTracker sealer.FaultTracker, + as *ctladdr.AddressSelector, + pcs *chainsched.ProviderChainSched, + + maxDeclareRecoveriesGasFee types.FIL, + actors []dtypes.MinerAddress) (*WdPostRecoverDeclareTask, error) { + t := &WdPostRecoverDeclareTask{ + sender: sender, + db: db, + api: api, + faultTracker: faultTracker, + + maxDeclareRecoveriesGasFee: maxDeclareRecoveriesGasFee, + as: as, + actors: actors, + } + + if err := pcs.AddHandler(t.processHeadChange); err != nil { + return nil, err + } + + return t, nil +} + +func (w *WdPostRecoverDeclareTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + log.Debugw("WdPostRecoverDeclareTask.Do()", "taskID", taskID) + ctx := context.Background() + + var spID, pps, dlIdx, partIdx uint64 + + err = w.db.QueryRow(context.Background(), + `Select sp_id, proving_period_start, deadline_index, partition_index + from wdpost_recovery_tasks + where task_id = $1`, taskID).Scan( + &spID, &pps, &dlIdx, &partIdx, + ) + if err != nil { + log.Errorf("WdPostRecoverDeclareTask.Do() failed to queryRow: %v", err) + return false, err + } + + head, err := w.api.ChainHead(context.Background()) + if err != nil { + log.Errorf("WdPostRecoverDeclareTask.Do() failed to get chain head: %v", err) + return false, err + } + + deadline := wdpost.NewDeadlineInfo(abi.ChainEpoch(pps), dlIdx, head.Height()) + + if deadline.FaultCutoffPassed() { + log.Errorf("WdPostRecover removed stale task: %v %v", taskID, deadline) + return true, nil + } + + maddr, err := address.NewIDAddress(spID) + if err != nil { + log.Errorf("WdPostTask.Do() failed to NewIDAddress: %v", err) + return false, err + } + + partitions, err := w.api.StateMinerPartitions(context.Background(), maddr, dlIdx, head.Key()) + if err != nil { + log.Errorf("WdPostRecoverDeclareTask.Do() failed to get partitions: %v", err) + return false, err + } + + if partIdx >= uint64(len(partitions)) { + log.Errorf("WdPostRecoverDeclareTask.Do() failed to get partitions: partIdx >= len(partitions)") + return false, err + } + + partition := partitions[partIdx] + + unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors) + if err != nil { + return false, xerrors.Errorf("subtracting recovered set from fault set: %w", err) + } + + uc, err := unrecovered.Count() + if err != nil { + return false, xerrors.Errorf("counting unrecovered sectors: %w", err) + } + + if uc == 0 { + log.Warnw("nothing to declare recovered", "maddr", maddr, "deadline", deadline, "partition", partIdx) + return true, nil + } + + recovered, err := checkSectors(ctx, w.api, w.faultTracker, maddr, unrecovered, head.Key()) + if err != nil { + return false, xerrors.Errorf("checking unrecovered sectors: %w", err) + } + + // if all sectors failed to recover, don't declare recoveries + recoveredCount, err := recovered.Count() + if err != nil { + return false, xerrors.Errorf("counting recovered sectors: %w", err) + } + + if recoveredCount == 0 { + log.Warnw("no sectors recovered", "maddr", maddr, "deadline", deadline, "partition", partIdx) + return true, nil + } + + recDecl := miner.RecoveryDeclaration{ + Deadline: dlIdx, + Partition: uint64(partIdx), + Sectors: recovered, + } + + params := &miner.DeclareFaultsRecoveredParams{ + Recoveries: []miner.RecoveryDeclaration{recDecl}, + } + + enc, aerr := actors.SerializeParams(params) + if aerr != nil { + return false, xerrors.Errorf("could not serialize declare recoveries parameters: %w", aerr) + } + + msg := &types.Message{ + To: maddr, + Method: builtin.MethodsMiner.DeclareFaultsRecovered, + Params: enc, + Value: types.NewInt(0), + } + + msg, mss, err := preparePoStMessage(w.api, w.as, maddr, msg, abi.TokenAmount(w.maxDeclareRecoveriesGasFee)) + + mc, err := w.sender.Send(ctx, msg, mss, "declare-recoveries") + if err != nil { + return false, xerrors.Errorf("sending declare recoveries message: %w", err) + } + + log.Debugw("WdPostRecoverDeclareTask.Do() sent declare recoveries message", "maddr", maddr, "deadline", deadline, "partition", partIdx, "mc", mc) + return true, nil +} + +func (w *WdPostRecoverDeclareTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + if len(ids) == 0 { + // probably can't happen, but panicking is bad + return nil, nil + } + + if w.sender == nil { + // we can't send messages + return nil, nil + } + + return &ids[0], nil +} + +func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: 128, + Name: "WdPostRecoverDeclare", + Cost: resources.Resources{ + Cpu: 1, + Gpu: 0, + Ram: 128 << 20, + }, + MaxFailures: 10, + Follows: nil, + } +} + +func (w *WdPostRecoverDeclareTask) Adder(taskFunc harmonytask.AddTaskFunc) { + w.startCheckTF.Set(taskFunc) +} + +func (w *WdPostRecoverDeclareTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error { + tf := w.startCheckTF.Val(ctx) + + for _, act := range w.actors { + maddr := address.Address(act) + + aid, err := address.IDFromAddress(maddr) + if err != nil { + return xerrors.Errorf("getting miner ID: %w", err) + } + + di, err := w.api.StateMinerProvingDeadline(ctx, maddr, apply.Key()) + if err != nil { + return err + } + + if !di.PeriodStarted() { + return nil // not proving anything yet + } + + // declaring two deadlines ahead + declDeadline := (di.Index + 2) % di.WPoStPeriodDeadlines + + pps := di.PeriodStart + if declDeadline != di.Index+2 { + pps = di.NextPeriodStart() + } + + partitions, err := w.api.StateMinerPartitions(ctx, maddr, declDeadline, apply.Key()) + if err != nil { + return xerrors.Errorf("getting partitions: %w", err) + } + + for pidx, partition := range partitions { + unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors) + if err != nil { + return xerrors.Errorf("subtracting recovered set from fault set: %w", err) + } + + uc, err := unrecovered.Count() + if err != nil { + return xerrors.Errorf("counting unrecovered sectors: %w", err) + } + + if uc == 0 { + log.Debugw("WdPostRecoverDeclareTask.processHeadChange() uc == 0, skipping", "maddr", maddr, "declDeadline", declDeadline, "pidx", pidx) + continue + } + + tid := wdTaskIdentity{ + Sp_id: aid, + Proving_period_start: pps, + Deadline_index: declDeadline, + Partition_index: uint64(pidx), + } + + tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { + return w.addTaskToDB(id, tid, tx) + }) + } + } + + return nil +} + +func (w *WdPostRecoverDeclareTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIdentity, tx *harmonydb.Tx) (bool, error) { + _, err := tx.Exec( + `INSERT INTO wdpost_recovery_tasks ( + task_id, + sp_id, + proving_period_start, + deadline_index, + partition_index + ) VALUES ($1, $2, $3, $4, $5)`, + taskId, + taskIdent.Sp_id, + taskIdent.Proving_period_start, + taskIdent.Deadline_index, + taskIdent.Partition_index, + ) + if err != nil { + return false, xerrors.Errorf("insert partition task: %w", err) + } + + return true, nil +} + +var _ harmonytask.TaskInterface = &WdPostRecoverDeclareTask{} diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index 42979339b..ba0f63f6d 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -36,6 +36,7 @@ type WdPoStSubmitTaskApi interface { StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) + GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error) GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error) GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) } @@ -136,51 +137,16 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return false, xerrors.Errorf("invalid miner address: %w", err) } - mi, err := w.api.StateMinerInfo(context.Background(), maddr, types.EmptyTSK) - if err != nil { - return false, xerrors.Errorf("error getting miner info: %w", err) - } - msg := &types.Message{ To: maddr, Method: builtin.MethodsMiner.SubmitWindowedPoSt, Params: pbuf.Bytes(), Value: big.Zero(), - - From: mi.Worker, // set worker for now for gas estimation } - // calculate a more frugal estimation; premium is estimated to guarantee - // inclusion within 5 tipsets, and fee cap is estimated for inclusion - // within 4 tipsets. - minGasFeeMsg := *msg - - minGasFeeMsg.GasPremium, err = w.api.GasEstimateGasPremium(context.Background(), 5, msg.From, msg.GasLimit, types.EmptyTSK) + msg, mss, err := preparePoStMessage(w.api, w.as, maddr, msg, abi.TokenAmount(w.maxWindowPoStGasFee)) if err != nil { - log.Errorf("failed to estimate minimum gas premium: %+v", err) - minGasFeeMsg.GasPremium = msg.GasPremium - } - - minGasFeeMsg.GasFeeCap, err = w.api.GasEstimateFeeCap(context.Background(), &minGasFeeMsg, 4, types.EmptyTSK) - if err != nil { - log.Errorf("failed to estimate minimum gas fee cap: %+v", err) - minGasFeeMsg.GasFeeCap = msg.GasFeeCap - } - - // goodFunds = funds needed for optimal inclusion probability. - // minFunds = funds needed for more speculative inclusion probability. - goodFunds := big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value) - minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds) - - from, _, err := w.as.AddressFor(context.Background(), w.api, mi, api.PoStAddr, goodFunds, minFunds) - if err != nil { - return false, xerrors.Errorf("error getting address: %w", err) - } - - msg.From = from - - mss := &api.MessageSendSpec{ - MaxFee: abi.TokenAmount(w.maxWindowPoStGasFee), + return false, xerrors.Errorf("preparing proof message: %w", err) } smsg, err := w.sender.Send(context.Background(), msg, mss, "wdpost") @@ -269,4 +235,70 @@ func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply return nil } +type MsgPrepAPI interface { + StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) + GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error) + GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error) + GasEstimateGasPremium(ctx context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) + + WalletBalance(context.Context, address.Address) (types.BigInt, error) + WalletHas(context.Context, address.Address) (bool, error) + StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) + StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) +} + +func preparePoStMessage(w MsgPrepAPI, as *ctladdr.AddressSelector, maddr address.Address, msg *types.Message, maxFee abi.TokenAmount) (*types.Message, *api.MessageSendSpec, error) { + mi, err := w.StateMinerInfo(context.Background(), maddr, types.EmptyTSK) + if err != nil { + return nil, nil, xerrors.Errorf("error getting miner info: %w", err) + } + + // set the worker as a fallback + msg.From = mi.Worker + + mss := &api.MessageSendSpec{ + MaxFee: abi.TokenAmount(maxFee), + } + + // (optimal) initial estimation with some overestimation that guarantees + // block inclusion within the next 20 tipsets. + gm, err := w.GasEstimateMessageGas(context.Background(), msg, mss, types.EmptyTSK) + if err != nil { + log.Errorw("estimating gas", "error", err) + return nil, nil, xerrors.Errorf("estimating gas: %w", err) + } + *msg = *gm + + // calculate a more frugal estimation; premium is estimated to guarantee + // inclusion within 5 tipsets, and fee cap is estimated for inclusion + // within 4 tipsets. + minGasFeeMsg := *msg + + minGasFeeMsg.GasPremium, err = w.GasEstimateGasPremium(context.Background(), 5, msg.From, msg.GasLimit, types.EmptyTSK) + if err != nil { + log.Errorf("failed to estimate minimum gas premium: %+v", err) + minGasFeeMsg.GasPremium = msg.GasPremium + } + + minGasFeeMsg.GasFeeCap, err = w.GasEstimateFeeCap(context.Background(), &minGasFeeMsg, 4, types.EmptyTSK) + if err != nil { + log.Errorf("failed to estimate minimum gas fee cap: %+v", err) + minGasFeeMsg.GasFeeCap = msg.GasFeeCap + } + + // goodFunds = funds needed for optimal inclusion probability. + // minFunds = funds needed for more speculative inclusion probability. + goodFunds := big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value) + minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds) + + from, _, err := as.AddressFor(context.Background(), w, mi, api.PoStAddr, goodFunds, minFunds) + if err != nil { + return nil, nil, xerrors.Errorf("error getting address: %w", err) + } + + msg.From = from + + return msg, mss, nil +} + var _ harmonytask.TaskInterface = &WdPostSubmitTask{}