lpwindow: Submit Do

This commit is contained in:
Łukasz Magiera 2023-11-03 22:40:28 +01:00
parent c1add7d5eb
commit a686a995f6
6 changed files with 203 additions and 52 deletions

View File

@ -245,12 +245,12 @@ var runCmd = &cli.Command{
{ {
if cfg.Subsystems.EnableWindowPost { if cfg.Subsystems.EnableWindowPost {
wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, wdPostTask, wdPoStSubmitTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
if err != nil { if err != nil {
return err return err
} }
activeTasks = append(activeTasks, wdPostTask) activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask)
} }
} }
taskEngine, err := harmonytask.New(db, activeTasks, listenAddr) taskEngine, err := harmonytask.New(db, activeTasks, listenAddr)

View File

@ -27,6 +27,7 @@ create table wdpost_proofs
proof_message bytea, proof_message bytea,
submit_task_id bigint not null, submit_task_id bigint not null,
message_cid text not null,
constraint wdpost_proofs_identity_key constraint wdpost_proofs_identity_key
unique (sp_id, deadline, partition) unique (sp_id, deadline, partition)

View File

@ -2,6 +2,7 @@ package provider
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/provider/lpmessage"
"time" "time"
"github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/paths"
@ -23,19 +24,26 @@ var log = logging.Logger("provider")
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig, func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker,
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, error) { as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, error) {
chainSched := chainsched.New(api) chainSched := chainsched.New(api)
// todo config // todo config
ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second) ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second)
task, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max) sender := lpmessage.NewSender(api, api, db)
computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max)
if err != nil { if err != nil {
return nil, err return nil, nil, err
}
submitTask, err := lpwindow.NewWdPostSubmitTask(chainSched, sender, db, api, fc.MaxWindowPoStGasFee, as)
if err != nil {
return nil, nil, err
} }
go chainSched.Run(ctx) go chainSched.Run(ctx)
return task, nil return computeTask, submitTask, nil
} }

View File

@ -9,15 +9,18 @@ import (
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
var log = logging.Logger("lpmessage")
type SenderAPI interface { type SenderAPI interface {
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
WalletBalance(ctx context.Context, addr address.Address) (big.Int, error) WalletBalance(ctx context.Context, addr address.Address) (big.Int, error)
MpoolGetNonce(context.Context, address.Address) (uint64, error) MpoolGetNonce(context.Context, address.Address) (uint64, error)
MpoolSend(context.Context, *types.SignedMessage) (cid.Cid, error) MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)
} }
type SignerAPI interface { type SignerAPI interface {
@ -147,7 +150,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
} }
// push to mpool // push to mpool
_, err = s.api.MpoolSend(ctx, sigMsg) _, err = s.api.MpoolPush(ctx, sigMsg)
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err) return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err)
} }
@ -161,5 +164,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", c) return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", c)
} }
log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit)
return cid.Undef, nil return cid.Undef, nil
} }

View File

@ -80,8 +80,37 @@ type wdTaskIdentity struct {
Partition_index uint64 Partition_index uint64
} }
func NewWdPostTask(db *harmonydb.DB,
api WDPoStAPI,
faultTracker sealer.FaultTracker,
prover ProverPoSt,
verifier storiface.Verifier,
pcs *chainsched.ProviderChainSched,
actors []dtypes.MinerAddress,
max int,
) (*WdPostTask, error) {
t := &WdPostTask{
db: db,
api: api,
faultTracker: faultTracker,
prover: prover,
verifier: verifier,
actors: actors,
max: max,
}
if err := pcs.AddHandler(t.processHeadChange); err != nil {
return nil, err
}
return t, nil
}
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
log.Debugw("WdPostTask.Do() called with taskID: %v", taskID) log.Debugw("WdPostTask.Do()", "taskID", taskID)
var spID, pps, dlIdx, partIdx uint64 var spID, pps, dlIdx, partIdx uint64
@ -333,35 +362,6 @@ func (t *WdPostTask) processHeadChange(ctx context.Context, revert, apply *types
return nil return nil
} }
func NewWdPostTask(db *harmonydb.DB,
api WDPoStAPI,
faultTracker sealer.FaultTracker,
prover ProverPoSt,
verifier storiface.Verifier,
pcs *chainsched.ProviderChainSched,
actors []dtypes.MinerAddress,
max int,
) (*WdPostTask, error) {
t := &WdPostTask{
db: db,
api: api,
faultTracker: faultTracker,
prover: prover,
verifier: verifier,
actors: actors,
max: max,
}
if err := pcs.AddHandler(t.processHeadChange); err != nil {
return nil, err
}
return t, nil
}
func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIdentity, tx *harmonydb.Tx) (bool, error) { func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIdentity, tx *harmonydb.Tx) (bool, error) {
_, err := tx.Exec( _, err := tx.Exec(

View File

@ -1,7 +1,14 @@
package lpwindow package lpwindow
import ( import (
"bytes"
"context" "context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/harmonytask"
@ -9,18 +16,160 @@ import (
"github.com/filecoin-project/lotus/lib/promise" "github.com/filecoin-project/lotus/lib/promise"
"github.com/filecoin-project/lotus/provider/chainsched" "github.com/filecoin-project/lotus/provider/chainsched"
"github.com/filecoin-project/lotus/provider/lpmessage" "github.com/filecoin-project/lotus/provider/lpmessage"
"github.com/filecoin-project/lotus/storage/ctladdr"
"golang.org/x/xerrors"
) )
type WdPoStSubmitTaskApi interface {
ChainHead(context.Context) (*types.TipSet, error)
WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletHas(context.Context, address.Address) (bool, error)
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error)
GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error)
}
type WdPostSubmitTask struct { type WdPostSubmitTask struct {
sender *lpmessage.Sender sender *lpmessage.Sender
db *harmonydb.DB db *harmonydb.DB
api WdPoStSubmitTaskApi
maxWindowPoStGasFee types.FIL
as *ctladdr.AddressSelector
submitPoStTF promise.Promise[harmonytask.AddTaskFunc] submitPoStTF promise.Promise[harmonytask.AddTaskFunc]
} }
func NewWdPostSubmitTask(pcs *chainsched.ProviderChainSched, send *lpmessage.Sender, db *harmonydb.DB, api WdPoStSubmitTaskApi, maxWindowPoStGasFee types.FIL, as *ctladdr.AddressSelector) (*WdPostSubmitTask, error) {
res := &WdPostSubmitTask{
sender: send,
db: db,
api: api,
maxWindowPoStGasFee: maxWindowPoStGasFee,
as: as,
}
if err := pcs.AddHandler(res.processHeadChange); err != nil {
return nil, err
}
return res, nil
}
func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
//TODO implement me log.Debugw("WdPostSubmitTask.Do", "taskID", taskID)
panic("implement me")
var spID uint64
var deadline uint64
var partition uint64
var submitAtEpoch, submitByEpoch abi.ChainEpoch
var proofParamBytes []byte
var dbTask uint64
err = w.db.QueryRow(
context.Background(), `select sp_id, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message, submit_task_id
from wdpost_proofs where sp_id = $1 and deadline = $2 and partition = $3`, spID, deadline, partition,
).Scan(&spID, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &proofParamBytes, &dbTask)
if err != nil {
return false, xerrors.Errorf("query post proof: %w", err)
}
if dbTask != uint64(taskID) {
return false, xerrors.Errorf("taskID mismatch: %d != %d", dbTask, taskID)
}
head, err := w.api.ChainHead(context.Background())
if err != nil {
return false, xerrors.Errorf("getting chain head: %w", err)
}
if head.Height() > submitByEpoch {
// we missed the deadline, no point in submitting
log.Errorw("missed submit deadline", "spID", spID, "deadline", deadline, "partition", partition, "submitByEpoch", submitByEpoch, "headHeight", head.Height())
return true, nil
}
if head.Height() < submitAtEpoch {
log.Errorw("submit epoch not reached", "spID", spID, "deadline", deadline, "partition", partition, "submitAtEpoch", submitAtEpoch, "headHeight", head.Height())
return false, xerrors.Errorf("submit epoch not reached: %d < %d", head.Height(), submitAtEpoch)
}
var params miner.SubmitWindowedPoStParams
if err := params.UnmarshalCBOR(bytes.NewReader(proofParamBytes)); err != nil {
return false, xerrors.Errorf("unmarshaling proof message: %w", err)
}
maddr, err := address.NewIDAddress(spID)
if err != nil {
return false, xerrors.Errorf("invalid miner address: %w", err)
}
mi, err := w.api.StateMinerInfo(context.Background(), maddr, types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("error getting miner info: %w", err)
}
msg := &types.Message{
To: maddr,
Method: builtin.MethodsMiner.SubmitWindowedPoSt,
Params: proofParamBytes,
Value: big.Zero(),
From: mi.Worker, // set worker for now for gas estimation
}
// calculate a more frugal estimation; premium is estimated to guarantee
// inclusion within 5 tipsets, and fee cap is estimated for inclusion
// within 4 tipsets.
minGasFeeMsg := *msg
minGasFeeMsg.GasPremium, err = w.api.GasEstimateGasPremium(context.Background(), 5, msg.From, msg.GasLimit, types.EmptyTSK)
if err != nil {
log.Errorf("failed to estimate minimum gas premium: %+v", err)
minGasFeeMsg.GasPremium = msg.GasPremium
}
minGasFeeMsg.GasFeeCap, err = w.api.GasEstimateFeeCap(context.Background(), &minGasFeeMsg, 4, types.EmptyTSK)
if err != nil {
log.Errorf("failed to estimate minimum gas fee cap: %+v", err)
minGasFeeMsg.GasFeeCap = msg.GasFeeCap
}
// goodFunds = funds needed for optimal inclusion probability.
// minFunds = funds needed for more speculative inclusion probability.
goodFunds := big.Add(msg.RequiredFunds(), msg.Value)
minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds)
from, _, err := w.as.AddressFor(context.Background(), w.api, mi, api.PoStAddr, goodFunds, minFunds)
if err != nil {
return false, xerrors.Errorf("error getting address: %w", err)
}
msg.From = from
mss := &api.MessageSendSpec{
MaxFee: abi.TokenAmount(w.maxWindowPoStGasFee),
}
smsg, err := w.sender.Send(context.Background(), msg, mss, "wdpost")
if err != nil {
return false, xerrors.Errorf("sending proof message: %w", err)
}
// set message_cid in the wdpost_proofs entry
_, err = w.db.Exec(context.Background(), `update wdpost_proofs set message_cid = $1 where sp_id = $2 and deadline = $3 and partition = $4`, smsg.String(), spID, deadline, partition)
if err != nil {
return true, xerrors.Errorf("updating wdpost_proofs: %w", err)
}
return true, nil
} }
func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
@ -37,18 +186,6 @@ func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonyta
return &ids[0], nil return &ids[0], nil
} }
func NewWdPostSubmitTask(pcs *chainsched.ProviderChainSched, send *lpmessage.Sender) (*WdPostSubmitTask, error) {
res := &WdPostSubmitTask{
sender: send,
}
if err := pcs.AddHandler(res.processHeadChange); err != nil {
return nil, err
}
return res, nil
}
func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails { func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{ return harmonytask.TaskTypeDetails{
Max: 128, Max: 128,