lpwindow: Submit task Adder

This commit is contained in:
Łukasz Magiera 2023-11-03 21:53:15 +01:00
parent ebec992ba8
commit 81603a67f3
3 changed files with 68 additions and 41 deletions

View File

@ -24,5 +24,10 @@ create table wdpost_proofs
partition bigint not null, partition bigint not null,
submit_at_epoch bigint not null, submit_at_epoch bigint not null,
submit_by_epoch bigint not null, submit_by_epoch bigint not null,
proof_message bytea proof_message bytea,
submit_task_id bigint not null,
constraint wdpost_proofs_identity_key
unique (sp_id, deadline, partition)
); );

View File

@ -149,43 +149,6 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
deadline.Close, deadline.Close,
msgbuf.Bytes()) msgbuf.Bytes())
/*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts)
if err != nil {
log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err)
return false, err
}
log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams)
// Enter an entry for each wdpost message proof into the wdpost_proofs table
for _, params := range submitWdPostParams {
// Convert submitWdPostParams.Partitions to a byte array using CBOR
buf := new(bytes.Buffer)
scratch := make([]byte, 9)
if err := cbg.WriteMajorTypeHeaderBuf(scratch, buf, cbg.MajArray, uint64(len(params.Partitions))); err != nil {
return false, err
}
for _, v := range params.Partitions {
if err := v.MarshalCBOR(buf); err != nil {
return false, err
}
}
// Insert into wdpost_proofs table
_, err = t.db.Exec(context.Background(),
`INSERT INTO wdpost_proofs (
deadline,
partitions,
proof_type,
proof_bytes)
VALUES ($1, $2, $3, $4)`,
params.Deadline,
buf.Bytes(),
params.Proofs[0].PoStProof,
params.Proofs[0].ProofBytes)
}*/
return true, nil return true, nil
} }

View File

@ -1,11 +1,21 @@
package lpwindow package lpwindow
import ( import (
"context"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources" "github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/promise"
"github.com/filecoin-project/lotus/provider/chainsched"
"github.com/filecoin-project/lotus/provider/lpmessage"
) )
type WdPostSubmitTask struct { type WdPostSubmitTask struct {
sender *lpmessage.Sender
db *harmonydb.DB
submitPoStTF promise.Promise[harmonytask.AddTaskFunc]
} }
func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
@ -18,6 +28,18 @@ func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonyta
panic("implement me") panic("implement me")
} }
func NewWdPostSubmitTask(pcs *chainsched.ProviderChainSched, send *lpmessage.Sender) (*WdPostSubmitTask, error) {
res := &WdPostSubmitTask{
sender: send,
}
if err := pcs.AddHandler(res.processHeadChange); err != nil {
return nil, err
}
return res, nil
}
func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails { func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{ return harmonytask.TaskTypeDetails{
Max: 128, Max: 128,
@ -25,7 +47,7 @@ func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails {
Cost: resources.Resources{ Cost: resources.Resources{
Cpu: 0, Cpu: 0,
Gpu: 0, Gpu: 0,
Ram: 0, Ram: 10 << 20,
}, },
MaxFailures: 10, MaxFailures: 10,
Follows: nil, // ?? Follows: nil, // ??
@ -33,8 +55,45 @@ func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails {
} }
func (w *WdPostSubmitTask) Adder(taskFunc harmonytask.AddTaskFunc) { func (w *WdPostSubmitTask) Adder(taskFunc harmonytask.AddTaskFunc) {
//TODO implement me w.submitPoStTF.Set(taskFunc)
panic("implement me") }
func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error {
tf := w.submitPoStTF.Val(ctx)
qry, err := w.db.Query(ctx, `select sp_id, deadline, partition, submit_at_epoch from wdpost_proofs where submit_task_id is null and submit_at_epoch <= $1`, apply.Height())
if err != nil {
return err
}
defer qry.Close()
for qry.Next() {
var spID int64
var deadline uint64
var partition uint64
var submitAtEpoch uint64
if err := qry.Scan(&spID, &deadline, &partition, &submitAtEpoch); err != nil {
return err
}
tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
// update in transaction iff submit_task_id is still null
res, err := tx.Exec(`update wdpost_proofs set submit_task_id = $1 where sp_id = $2 and deadline = $3 and partition = $4 and submit_task_id is null`, id, spID, deadline, partition)
if err != nil {
return false, err
}
if res != 1 {
return false, nil
}
return true, nil
})
}
if err := qry.Err(); err != nil {
return err
}
return nil
} }
var _ harmonytask.TaskInterface = &WdPostSubmitTask{} var _ harmonytask.TaskInterface = &WdPostSubmitTask{}