diff --git a/lib/harmony/harmonydb/sql/20230823.sql b/lib/harmony/harmonydb/sql/20230823.sql index c16294bf2..fa402dbd3 100644 --- a/lib/harmony/harmonydb/sql/20230823.sql +++ b/lib/harmony/harmonydb/sql/20230823.sql @@ -24,5 +24,10 @@ create table wdpost_proofs partition bigint not null, submit_at_epoch bigint not null, submit_by_epoch bigint not null, - proof_message bytea + proof_message bytea, + + submit_task_id bigint not null, + + constraint wdpost_proofs_identity_key + unique (sp_id, deadline, partition) ); diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index 0e59d5d2b..05cc9ae32 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -149,43 +149,6 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done deadline.Close, msgbuf.Bytes()) - /*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts) - if err != nil { - log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err) - return false, err - } - - log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams) - - // Enter an entry for each wdpost message proof into the wdpost_proofs table - for _, params := range submitWdPostParams { - - // Convert submitWdPostParams.Partitions to a byte array using CBOR - buf := new(bytes.Buffer) - scratch := make([]byte, 9) - if err := cbg.WriteMajorTypeHeaderBuf(scratch, buf, cbg.MajArray, uint64(len(params.Partitions))); err != nil { - return false, err - } - for _, v := range params.Partitions { - if err := v.MarshalCBOR(buf); err != nil { - return false, err - } - } - - // Insert into wdpost_proofs table - _, err = t.db.Exec(context.Background(), - `INSERT INTO wdpost_proofs ( - deadline, - partitions, - proof_type, - proof_bytes) - VALUES ($1, $2, $3, $4)`, - params.Deadline, - buf.Bytes(), - params.Proofs[0].PoStProof, - params.Proofs[0].ProofBytes) - }*/ - return true, nil } diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index de0ef94c9..2369a6759 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -1,11 +1,21 @@ package lpwindow import ( + "context" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/promise" + "github.com/filecoin-project/lotus/provider/chainsched" + "github.com/filecoin-project/lotus/provider/lpmessage" ) type WdPostSubmitTask struct { + sender *lpmessage.Sender + db *harmonydb.DB + + submitPoStTF promise.Promise[harmonytask.AddTaskFunc] } func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { @@ -18,6 +28,18 @@ func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonyta panic("implement me") } +func NewWdPostSubmitTask(pcs *chainsched.ProviderChainSched, send *lpmessage.Sender) (*WdPostSubmitTask, error) { + res := &WdPostSubmitTask{ + sender: send, + } + + if err := pcs.AddHandler(res.processHeadChange); err != nil { + return nil, err + } + + return res, nil +} + func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ Max: 128, @@ -25,7 +47,7 @@ func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails { Cost: resources.Resources{ Cpu: 0, Gpu: 0, - Ram: 0, + Ram: 10 << 20, }, MaxFailures: 10, Follows: nil, // ?? @@ -33,8 +55,45 @@ func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails { } func (w *WdPostSubmitTask) Adder(taskFunc harmonytask.AddTaskFunc) { - //TODO implement me - panic("implement me") + w.submitPoStTF.Set(taskFunc) +} + +func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error { + tf := w.submitPoStTF.Val(ctx) + + qry, err := w.db.Query(ctx, `select sp_id, deadline, partition, submit_at_epoch from wdpost_proofs where submit_task_id is null and submit_at_epoch <= $1`, apply.Height()) + if err != nil { + return err + } + defer qry.Close() + + for qry.Next() { + var spID int64 + var deadline uint64 + var partition uint64 + var submitAtEpoch uint64 + if err := qry.Scan(&spID, &deadline, &partition, &submitAtEpoch); err != nil { + return err + } + + tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { + // update in transaction iff submit_task_id is still null + res, err := tx.Exec(`update wdpost_proofs set submit_task_id = $1 where sp_id = $2 and deadline = $3 and partition = $4 and submit_task_id is null`, id, spID, deadline, partition) + if err != nil { + return false, err + } + if res != 1 { + return false, nil + } + + return true, nil + }) + } + if err := qry.Err(); err != nil { + return err + } + + return nil } var _ harmonytask.TaskInterface = &WdPostSubmitTask{}