lpwindow: IT SUBMITS

This commit is contained in:
Łukasz Magiera 2023-11-04 12:32:27 +01:00
parent 4f9e168017
commit edd95f6524
5 changed files with 69 additions and 31 deletions

View File

@ -20,15 +20,16 @@ comment on column wdpost_partition_tasks.partition_index is 'partition index wit
create table wdpost_proofs create table wdpost_proofs
( (
sp_id bigint not null, sp_id bigint not null,
proving_period_start bigint not null,
deadline bigint not null, deadline bigint not null,
partition bigint not null, partition bigint not null,
submit_at_epoch bigint not null, submit_at_epoch bigint not null,
submit_by_epoch bigint not null, submit_by_epoch bigint not null,
proof_message bytea, proof_message bytea,
submit_task_id bigint not null, submit_task_id bigint,
message_cid text not null, message_cid text,
constraint wdpost_proofs_identity_key constraint wdpost_proofs_identity_key
unique (sp_id, deadline, partition) unique (sp_id, proving_period_start, deadline, partition)
); );

View File

@ -115,7 +115,7 @@ top:
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
stackSlice := make([]byte, 512) stackSlice := make([]byte, 4092)
sz := runtime.Stack(stackSlice, false) sz := runtime.Stack(stackSlice, false)
log.Error("Recovered from a serious error "+ log.Error("Recovered from a serious error "+
"while processing "+h.Name+" task "+strconv.Itoa(int(*tID))+": ", r, "while processing "+h.Name+" task "+strconv.Itoa(int(*tID))+": ", r,

View File

@ -77,6 +77,8 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
return cid.Undef, xerrors.Errorf("getting key address: %w", err) return cid.Undef, xerrors.Errorf("getting key address: %w", err)
} }
msg.From = fromA
if msg.Nonce != 0 { if msg.Nonce != 0 {
return cid.Undef, xerrors.Errorf("Send expects message nonce to be 0, was %d", msg.Nonce) return cid.Undef, xerrors.Errorf("Send expects message nonce to be 0, was %d", msg.Nonce)
} }
@ -107,14 +109,14 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
} }
// get nonce from db // get nonce from db
var dbNonce uint64 var dbNonce *uint64
r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String()) r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String())
if err := r.Scan(&dbNonce); err != nil { if err := r.Scan(&dbNonce); err != nil {
return false, xerrors.Errorf("getting nonce from db: %w", err) return false, xerrors.Errorf("getting nonce from db: %w", err)
} }
if dbNonce+1 > msgNonce { if dbNonce != nil && *dbNonce+1 > msgNonce {
msgNonce = dbNonce + 1 msgNonce = *dbNonce + 1
} }
msg.Nonce = msgNonce msg.Nonce = msgNonce
@ -169,5 +171,5 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit) log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit)
return cid.Undef, nil return sigMsg.Cid(), nil
} }

View File

@ -162,22 +162,33 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
} }
// Insert into wdpost_proofs table // Insert into wdpost_proofs table
_, err = t.db.Exec(context.Background(), n, err := t.db.Exec(context.Background(),
`INSERT INTO wdpost_proofs ( `INSERT INTO wdpost_proofs (
sp_id, sp_id,
proving_period_start,
deadline, deadline,
partition, partition,
submit_at_epoch, submit_at_epoch,
submit_by_epoch, submit_by_epoch,
proof_message) proof_message)
VALUES ($1, $2, $3, $4, $5, $6)`, VALUES ($1, $2, $3, $4, $5, $6, $7)`,
spID, spID,
pps,
deadline.Index, deadline.Index,
partIdx, partIdx,
deadline.Open, deadline.Open,
deadline.Close, deadline.Close,
msgbuf.Bytes()) msgbuf.Bytes())
if err != nil {
log.Errorf("WdPostTask.Do() failed to insert into wdpost_proofs: %v", err)
return false, err
}
if n != 1 {
log.Errorf("WdPostTask.Do() failed to insert into wdpost_proofs: %v", err)
return false, err
}
return true, nil return true, nil
} }
@ -379,7 +390,7 @@ func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIden
taskIdent.Partition_index, taskIdent.Partition_index,
) )
if err != nil { if err != nil {
return false, err return false, xerrors.Errorf("insert partition task: %w", err)
} }
return true, nil return true, nil

View File

@ -3,6 +3,8 @@ package lpwindow
import ( import (
"bytes" "bytes"
"context" "context"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/storage/wdpost"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -32,6 +34,7 @@ type WdPoStSubmitTaskApi interface {
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error) GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error)
GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error)
@ -71,14 +74,14 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
var spID uint64 var spID uint64
var deadline uint64 var deadline uint64
var partition uint64 var partition uint64
var submitAtEpoch, submitByEpoch abi.ChainEpoch var pps, submitAtEpoch, submitByEpoch abi.ChainEpoch
var proofParamBytes []byte var earlyParamBytes []byte
var dbTask uint64 var dbTask uint64
err = w.db.QueryRow( err = w.db.QueryRow(
context.Background(), `select sp_id, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message, submit_task_id context.Background(), `select sp_id, proving_period_start, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message, submit_task_id
from wdpost_proofs where sp_id = $1 and deadline = $2 and partition = $3`, spID, deadline, partition, from wdpost_proofs where submit_task_id = $1`, taskID,
).Scan(&spID, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &proofParamBytes, &dbTask) ).Scan(&spID, &pps, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &earlyParamBytes, &dbTask)
if err != nil { if err != nil {
return false, xerrors.Errorf("query post proof: %w", err) return false, xerrors.Errorf("query post proof: %w", err)
} }
@ -103,11 +106,31 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
return false, xerrors.Errorf("submit epoch not reached: %d < %d", head.Height(), submitAtEpoch) return false, xerrors.Errorf("submit epoch not reached: %d < %d", head.Height(), submitAtEpoch)
} }
dlInfo := wdpost.NewDeadlineInfo(pps, deadline, head.Height())
var params miner.SubmitWindowedPoStParams var params miner.SubmitWindowedPoStParams
if err := params.UnmarshalCBOR(bytes.NewReader(proofParamBytes)); err != nil { if err := params.UnmarshalCBOR(bytes.NewReader(earlyParamBytes)); err != nil {
return false, xerrors.Errorf("unmarshaling proof message: %w", err) return false, xerrors.Errorf("unmarshaling proof message: %w", err)
} }
commEpoch := dlInfo.Challenge
commRand, err := w.api.StateGetRandomnessFromTickets(context.Background(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil, head.Key())
if err != nil {
err = xerrors.Errorf("failed to get chain randomness from tickets for windowPost (epoch=%d): %w", commEpoch, err)
log.Errorf("submitPoStMessage failed: %+v", err)
return false, xerrors.Errorf("getting post commit randomness: %w", err)
}
params.ChainCommitEpoch = commEpoch
params.ChainCommitRand = commRand
var pbuf bytes.Buffer
if err := params.MarshalCBOR(&pbuf); err != nil {
return false, xerrors.Errorf("marshaling proof message: %w", err)
}
maddr, err := address.NewIDAddress(spID) maddr, err := address.NewIDAddress(spID)
if err != nil { if err != nil {
return false, xerrors.Errorf("invalid miner address: %w", err) return false, xerrors.Errorf("invalid miner address: %w", err)
@ -121,7 +144,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
msg := &types.Message{ msg := &types.Message{
To: maddr, To: maddr,
Method: builtin.MethodsMiner.SubmitWindowedPoSt, Method: builtin.MethodsMiner.SubmitWindowedPoSt,
Params: proofParamBytes, Params: pbuf.Bytes(),
Value: big.Zero(), Value: big.Zero(),
From: mi.Worker, // set worker for now for gas estimation From: mi.Worker, // set worker for now for gas estimation
@ -146,7 +169,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
// goodFunds = funds needed for optimal inclusion probability. // goodFunds = funds needed for optimal inclusion probability.
// minFunds = funds needed for more speculative inclusion probability. // minFunds = funds needed for more speculative inclusion probability.
goodFunds := big.Add(msg.RequiredFunds(), msg.Value) goodFunds := big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value)
minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds) minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds)
from, _, err := w.as.AddressFor(context.Background(), w.api, mi, api.PoStAddr, goodFunds, minFunds) from, _, err := w.as.AddressFor(context.Background(), w.api, mi, api.PoStAddr, goodFunds, minFunds)
@ -167,7 +190,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
// set message_cid in the wdpost_proofs entry // set message_cid in the wdpost_proofs entry
_, err = w.db.Exec(context.Background(), `update wdpost_proofs set message_cid = $1 where sp_id = $2 and deadline = $3 and partition = $4`, smsg.String(), spID, deadline, partition) _, err = w.db.Exec(context.Background(), `update wdpost_proofs set message_cid = $1 where sp_id = $2 and proving_period_start = $3 and deadline = $4 and partition = $5`, smsg.String(), spID, pps, deadline, partition)
if err != nil { if err != nil {
return true, xerrors.Errorf("updating wdpost_proofs: %w", err) return true, xerrors.Errorf("updating wdpost_proofs: %w", err)
} }
@ -210,7 +233,7 @@ func (w *WdPostSubmitTask) Adder(taskFunc harmonytask.AddTaskFunc) {
func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error { func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error {
tf := w.submitPoStTF.Val(ctx) tf := w.submitPoStTF.Val(ctx)
qry, err := w.db.Query(ctx, `select sp_id, deadline, partition, submit_at_epoch from wdpost_proofs where submit_task_id is null and submit_at_epoch <= $1`, apply.Height()) qry, err := w.db.Query(ctx, `select sp_id, proving_period_start, deadline, partition, submit_at_epoch from wdpost_proofs where submit_task_id is null and submit_at_epoch <= $1`, apply.Height())
if err != nil { if err != nil {
return err return err
} }
@ -218,18 +241,19 @@ func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply
for qry.Next() { for qry.Next() {
var spID int64 var spID int64
var pps int64
var deadline uint64 var deadline uint64
var partition uint64 var partition uint64
var submitAtEpoch uint64 var submitAtEpoch uint64
if err := qry.Scan(&spID, &deadline, &partition, &submitAtEpoch); err != nil { if err := qry.Scan(&spID, &pps, &deadline, &partition, &submitAtEpoch); err != nil {
return err return xerrors.Errorf("scan submittable posts: %w", err)
} }
tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
// update in transaction iff submit_task_id is still null // update in transaction iff submit_task_id is still null
res, err := tx.Exec(`update wdpost_proofs set submit_task_id = $1 where sp_id = $2 and deadline = $3 and partition = $4 and submit_task_id is null`, id, spID, deadline, partition) res, err := tx.Exec(`update wdpost_proofs set submit_task_id = $1 where sp_id = $2 and proving_period_start = $3 and deadline = $4 and partition = $5 and submit_task_id is null`, id, spID, pps, deadline, partition)
if err != nil { if err != nil {
return false, err return false, xerrors.Errorf("query ready proof: %w", err)
} }
if res != 1 { if res != 1 {
return false, nil return false, nil