Merge remote-tracking branch 'origin/feat/wdpost-adder' into feat/wdpost-adder2

This commit is contained in:
Łukasz Magiera 2023-10-19 17:56:21 +02:00
commit 4e68fd674b
2 changed files with 45 additions and 5 deletions

View File

@ -20,4 +20,15 @@ create table wdpost_tasks
fault_declaration_cutoff bigint fault_declaration_cutoff bigint
); );
create table wdpost_proofs
(
deadline bigint not null,
partitions bytea not null,
proof_type bigint,
proof_bytes bytea,
chain_commit_epoch bigint,
chain_commit_rand bytea
);

View File

@ -1,6 +1,7 @@
package wdpost package wdpost
import ( import (
"bytes"
"context" "context"
"sort" "sort"
"time" "time"
@ -15,6 +16,7 @@ import (
"github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/samber/lo" "github.com/samber/lo"
cbg "github.com/whyrusleeping/cbor-gen"
) )
type WdPostTaskDetails struct { type WdPostTaskDetails struct {
@ -102,6 +104,35 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams) log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams)
// Enter an entry for each wdpost message proof into the wdpost_proofs table
for _, params := range submitWdPostParams {
// Convert submitWdPostParams.Partitions to a byte array using CBOR
buf := new(bytes.Buffer)
scratch := make([]byte, 9)
if err := cbg.WriteMajorTypeHeaderBuf(scratch, buf, cbg.MajArray, uint64(len(params.Partitions))); err != nil {
return false, err
}
for _, v := range params.Partitions {
if err := v.MarshalCBOR(buf); err != nil {
return false, err
}
}
// Insert into wdpost_proofs table
_, err = t.db.Exec(context.Background(),
`INSERT INTO wdpost_proofs (
deadline,
partitions,
proof_type,
proof_bytes)
VALUES ($1, $2, $3, $4)`,
params.Deadline,
buf.Bytes(),
params.Proofs[0].PoStProof,
params.Proofs[0].ProofBytes)
}
return true, nil return true, nil
} }
@ -195,12 +226,10 @@ func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails {
func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) { func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
log.Errorf("WdPostTask.Adder() called ----------------------------- ")
// wait for any channels on t.tasks and call taskFunc on them // wait for any channels on t.tasks and call taskFunc on them
for taskDetails := range t.tasks { for taskDetails := range t.tasks {
log.Errorf("WdPostTask.Adder() received taskDetails: %v", taskDetails) //log.Errorf("WdPostTask.Adder() received taskDetails: %v", taskDetails)
taskFunc(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { taskFunc(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
return t.addTaskToDB(taskDetails.Ts, taskDetails.Deadline, tID, tx) return t.addTaskToDB(taskDetails.Ts, taskDetails.Deadline, tID, tx)
@ -224,7 +253,7 @@ func (t *WdPostTask) AddTask(ctx context.Context, ts *types.TipSet, deadline *dl
Deadline: deadline, Deadline: deadline,
} }
log.Errorf("WdPostTask.AddTask() called with ts: %v, deadline: %v, taskList: %v", ts, deadline, t.tasks) //log.Errorf("WdPostTask.AddTask() called with ts: %v, deadline: %v, taskList: %v", ts, deadline, t.tasks)
return nil return nil
} }
@ -233,7 +262,7 @@ func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId
tsKey := ts.Key() tsKey := ts.Key()
log.Errorf("WdPostTask.addTaskToDB() called with tsKey: %v, taskId: %v", tsKey, taskId) //log.Errorf("WdPostTask.addTaskToDB() called with tsKey: %v, taskId: %v", tsKey, taskId)
_, err := tx.Exec( _, err := tx.Exec(
`INSERT INTO wdpost_tasks ( `INSERT INTO wdpost_tasks (