From edd95f6524a0957c6ada0f30c2419e9c44d97186 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 4 Nov 2023 12:32:27 +0100 Subject: [PATCH] lpwindow: IT SUBMITS --- lib/harmony/harmonydb/sql/20230823.sql | 19 +++---- lib/harmony/harmonytask/task_type_handler.go | 2 +- provider/lpmessage/sender.go | 10 ++-- provider/lpwindow/compute_task.go | 17 +++++-- provider/lpwindow/submit_task.go | 52 ++++++++++++++------ 5 files changed, 69 insertions(+), 31 deletions(-) diff --git a/lib/harmony/harmonydb/sql/20230823.sql b/lib/harmony/harmonydb/sql/20230823.sql index a9486567c..d2d954c7c 100644 --- a/lib/harmony/harmonydb/sql/20230823.sql +++ b/lib/harmony/harmonydb/sql/20230823.sql @@ -19,16 +19,17 @@ comment on column wdpost_partition_tasks.partition_index is 'partition index wit create table wdpost_proofs ( - sp_id bigint not null, - deadline bigint not null, - partition bigint not null, - submit_at_epoch bigint not null, - submit_by_epoch bigint not null, - proof_message bytea, + sp_id bigint not null, + proving_period_start bigint not null, + deadline bigint not null, + partition bigint not null, + submit_at_epoch bigint not null, + submit_by_epoch bigint not null, + proof_message bytea, - submit_task_id bigint not null, - message_cid text not null, + submit_task_id bigint, + message_cid text, constraint wdpost_proofs_identity_key - unique (sp_id, deadline, partition) + unique (sp_id, proving_period_start, deadline, partition) ); diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 6ab7a031a..09540913d 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -115,7 +115,7 @@ top: defer func() { if r := recover(); r != nil { - stackSlice := make([]byte, 512) + stackSlice := make([]byte, 4092) sz := runtime.Stack(stackSlice, false) log.Error("Recovered from a serious error "+ "while processing "+h.Name+" task "+strconv.Itoa(int(*tID))+": ", r, diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index b839f8003..f6bd0a8a7 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -77,6 +77,8 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return cid.Undef, xerrors.Errorf("getting key address: %w", err) } + msg.From = fromA + if msg.Nonce != 0 { return cid.Undef, xerrors.Errorf("Send expects message nonce to be 0, was %d", msg.Nonce) } @@ -107,14 +109,14 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS } // get nonce from db - var dbNonce uint64 + var dbNonce *uint64 r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String()) if err := r.Scan(&dbNonce); err != nil { return false, xerrors.Errorf("getting nonce from db: %w", err) } - if dbNonce+1 > msgNonce { - msgNonce = dbNonce + 1 + if dbNonce != nil && *dbNonce+1 > msgNonce { + msgNonce = *dbNonce + 1 } msg.Nonce = msgNonce @@ -169,5 +171,5 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit) - return cid.Undef, nil + return sigMsg.Cid(), nil } diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index a5baeb20a..372de22fc 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -162,22 +162,33 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } // Insert into wdpost_proofs table - _, err = t.db.Exec(context.Background(), + n, err := t.db.Exec(context.Background(), `INSERT INTO wdpost_proofs ( sp_id, + proving_period_start, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message) - VALUES ($1, $2, $3, $4, $5, $6)`, + VALUES ($1, $2, $3, $4, $5, $6, $7)`, spID, + pps, deadline.Index, partIdx, deadline.Open, deadline.Close, msgbuf.Bytes()) + if err != nil { + log.Errorf("WdPostTask.Do() failed to insert into wdpost_proofs: %v", err) + return false, err + } + if n != 1 { + log.Errorf("WdPostTask.Do() failed to insert into wdpost_proofs: %v", err) + return false, err + } + return true, nil } @@ -379,7 +390,7 @@ func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIden taskIdent.Partition_index, ) if err != nil { - return false, err + return false, xerrors.Errorf("insert partition task: %w", err) } return true, nil diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index dad7ca2f7..63f4e4651 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -3,6 +3,8 @@ package lpwindow import ( "bytes" "context" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/lotus/storage/wdpost" "golang.org/x/xerrors" @@ -32,6 +34,7 @@ type WdPoStSubmitTaskApi interface { StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) + StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error) GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) @@ -71,14 +74,14 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) var spID uint64 var deadline uint64 var partition uint64 - var submitAtEpoch, submitByEpoch abi.ChainEpoch - var proofParamBytes []byte + var pps, submitAtEpoch, submitByEpoch abi.ChainEpoch + var earlyParamBytes []byte var dbTask uint64 err = w.db.QueryRow( - context.Background(), `select sp_id, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message, submit_task_id - from wdpost_proofs where sp_id = $1 and deadline = $2 and partition = $3`, spID, deadline, partition, - ).Scan(&spID, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &proofParamBytes, &dbTask) + context.Background(), `select sp_id, proving_period_start, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message, submit_task_id + from wdpost_proofs where submit_task_id = $1`, taskID, + ).Scan(&spID, &pps, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &earlyParamBytes, &dbTask) if err != nil { return false, xerrors.Errorf("query post proof: %w", err) } @@ -103,11 +106,31 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return false, xerrors.Errorf("submit epoch not reached: %d < %d", head.Height(), submitAtEpoch) } + dlInfo := wdpost.NewDeadlineInfo(pps, deadline, head.Height()) + var params miner.SubmitWindowedPoStParams - if err := params.UnmarshalCBOR(bytes.NewReader(proofParamBytes)); err != nil { + if err := params.UnmarshalCBOR(bytes.NewReader(earlyParamBytes)); err != nil { return false, xerrors.Errorf("unmarshaling proof message: %w", err) } + commEpoch := dlInfo.Challenge + + commRand, err := w.api.StateGetRandomnessFromTickets(context.Background(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil, head.Key()) + if err != nil { + err = xerrors.Errorf("failed to get chain randomness from tickets for windowPost (epoch=%d): %w", commEpoch, err) + log.Errorf("submitPoStMessage failed: %+v", err) + + return false, xerrors.Errorf("getting post commit randomness: %w", err) + } + + params.ChainCommitEpoch = commEpoch + params.ChainCommitRand = commRand + + var pbuf bytes.Buffer + if err := params.MarshalCBOR(&pbuf); err != nil { + return false, xerrors.Errorf("marshaling proof message: %w", err) + } + maddr, err := address.NewIDAddress(spID) if err != nil { return false, xerrors.Errorf("invalid miner address: %w", err) @@ -121,7 +144,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) msg := &types.Message{ To: maddr, Method: builtin.MethodsMiner.SubmitWindowedPoSt, - Params: proofParamBytes, + Params: pbuf.Bytes(), Value: big.Zero(), From: mi.Worker, // set worker for now for gas estimation @@ -146,7 +169,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) // goodFunds = funds needed for optimal inclusion probability. // minFunds = funds needed for more speculative inclusion probability. - goodFunds := big.Add(msg.RequiredFunds(), msg.Value) + goodFunds := big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value) minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds) from, _, err := w.as.AddressFor(context.Background(), w.api, mi, api.PoStAddr, goodFunds, minFunds) @@ -167,7 +190,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) // set message_cid in the wdpost_proofs entry - _, err = w.db.Exec(context.Background(), `update wdpost_proofs set message_cid = $1 where sp_id = $2 and deadline = $3 and partition = $4`, smsg.String(), spID, deadline, partition) + _, err = w.db.Exec(context.Background(), `update wdpost_proofs set message_cid = $1 where sp_id = $2 and proving_period_start = $3 and deadline = $4 and partition = $5`, smsg.String(), spID, pps, deadline, partition) if err != nil { return true, xerrors.Errorf("updating wdpost_proofs: %w", err) } @@ -210,7 +233,7 @@ func (w *WdPostSubmitTask) Adder(taskFunc harmonytask.AddTaskFunc) { func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error { tf := w.submitPoStTF.Val(ctx) - qry, err := w.db.Query(ctx, `select sp_id, deadline, partition, submit_at_epoch from wdpost_proofs where submit_task_id is null and submit_at_epoch <= $1`, apply.Height()) + qry, err := w.db.Query(ctx, `select sp_id, proving_period_start, deadline, partition, submit_at_epoch from wdpost_proofs where submit_task_id is null and submit_at_epoch <= $1`, apply.Height()) if err != nil { return err } @@ -218,18 +241,19 @@ func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply for qry.Next() { var spID int64 + var pps int64 var deadline uint64 var partition uint64 var submitAtEpoch uint64 - if err := qry.Scan(&spID, &deadline, &partition, &submitAtEpoch); err != nil { - return err + if err := qry.Scan(&spID, &pps, &deadline, &partition, &submitAtEpoch); err != nil { + return xerrors.Errorf("scan submittable posts: %w", err) } tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { // update in transaction iff submit_task_id is still null - res, err := tx.Exec(`update wdpost_proofs set submit_task_id = $1 where sp_id = $2 and deadline = $3 and partition = $4 and submit_task_id is null`, id, spID, deadline, partition) + res, err := tx.Exec(`update wdpost_proofs set submit_task_id = $1 where sp_id = $2 and proving_period_start = $3 and deadline = $4 and partition = $5 and submit_task_id is null`, id, spID, pps, deadline, partition) if err != nil { - return false, err + return false, xerrors.Errorf("query ready proof: %w", err) } if res != 1 { return false, nil