This commit is contained in:
Shrenuj Bansal 2023-10-05 11:52:22 -04:00
parent 00b3335765
commit f01096bca3
7 changed files with 115 additions and 42 deletions

View File

@ -38,7 +38,7 @@ type TipSetKey struct {
// self-describing, wrapped as a string. // self-describing, wrapped as a string.
// These gymnastics make the a TipSetKey usable as a map key. // These gymnastics make the a TipSetKey usable as a map key.
// The empty key has value "". // The empty key has value "".
value string Value string
} }
// NewTipSetKey builds a new key from a slice of CIDs. // NewTipSetKey builds a new key from a slice of CIDs.
@ -59,7 +59,7 @@ func TipSetKeyFromBytes(encoded []byte) (TipSetKey, error) {
// Cids returns a slice of the CIDs comprising this key. // Cids returns a slice of the CIDs comprising this key.
func (k TipSetKey) Cids() []cid.Cid { func (k TipSetKey) Cids() []cid.Cid {
cids, err := decodeKey([]byte(k.value)) cids, err := decodeKey([]byte(k.Value))
if err != nil { if err != nil {
panic("invalid tipset key: " + err.Error()) panic("invalid tipset key: " + err.Error())
} }
@ -83,7 +83,7 @@ func (k TipSetKey) String() string {
// Bytes() returns a binary representation of the key. // Bytes() returns a binary representation of the key.
func (k TipSetKey) Bytes() []byte { func (k TipSetKey) Bytes() []byte {
return []byte(k.value) return []byte(k.Value)
} }
func (k TipSetKey) MarshalJSON() ([]byte, error) { func (k TipSetKey) MarshalJSON() ([]byte, error) {
@ -95,7 +95,7 @@ func (k *TipSetKey) UnmarshalJSON(b []byte) error {
if err := json.Unmarshal(b, &cids); err != nil { if err := json.Unmarshal(b, &cids); err != nil {
return err return err
} }
k.value = string(encodeKey(cids)) k.Value = string(encodeKey(cids))
return nil return nil
} }
@ -161,7 +161,7 @@ func (k *TipSetKey) UnmarshalCBOR(reader io.Reader) error {
} }
func (k TipSetKey) IsEmpty() bool { func (k TipSetKey) IsEmpty() bool {
return len(k.value) == 0 return len(k.Value) == 0
} }
func encodeKey(cids []cid.Cid) []byte { func encodeKey(cids []cid.Cid) []byte {

View File

@ -211,7 +211,7 @@ var runCmd = &cli.Command{
return err return err
} }
wdPostTask := wdpost.NewWdPostTask(db) wdPostTask := wdpost.NewWdPostTask(db, nil)
taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, address) taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, address)
if err != nil { if err != nil {

View File

@ -1,13 +1,17 @@
create table wdpost_tasks create table wdpost_tasks
( (
task_id int not null, task_id int not null
tskey varchar, constraint wdpost_tasks_pkey
current_epoch bigint, primary key,
period_start bigint, tskey bytea not null,
index bigint, current_epoch bigint not null,
open bigint, period_start bigint not null,
close bigint, index bigint not null
challenge bigint, constraint wdpost_tasks_index_key
unique,
open bigint not null,
close bigint not null,
challenge bigint not null,
fault_cutoff bigint, fault_cutoff bigint,
wpost_period_deadlines bigint, wpost_period_deadlines bigint,
wpost_proving_period bigint, wpost_proving_period bigint,
@ -16,3 +20,4 @@ create table wdpost_tasks
fault_declaration_cutoff bigint fault_declaration_cutoff bigint
); );

View File

@ -170,7 +170,8 @@ type proveHandler2 struct {
processedHeadChanges chan *headChange processedHeadChanges chan *headChange
processedPostResults chan *postResult processedPostResults chan *postResult
wdPostTask *WdPostTask wdPostTask *WdPostTask
currDeadline *dline.Info
} }
func newProver2( func newProver2(
@ -223,20 +224,22 @@ func (p *proveHandler2) run() {
func (p *proveHandler2) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) { func (p *proveHandler2) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
// If the post window has expired, abort the current proof // If the post window has expired, abort the current proof
if p.current != nil && newTS.Height() >= p.current.di.Close { //if p.current != nil && newTS.Height() >= p.current.di.Close {
// Cancel the context on the current proof // log.Errorf("Aborted window post Proving (Deadline: %+v), newTs: %+v", p.current.di, newTS.Height())
p.current.abort() // // Cancel the context on the current proof
// p.current.abort()
// Clear out the reference to the proof so that we can immediately //
// start generating a new proof, without having to worry about state // // Clear out the reference to the proof so that we can immediately
// getting clobbered when the abort completes // // start generating a new proof, without having to worry about state
p.current = nil // // getting clobbered when the abort completes
} // p.current = nil
//}
// Only generate one proof at a time //
if p.current != nil { //// Only generate one proof at a time
return //log.Errorf("p.current: %+v", p.current)
} //if p.current != nil {
// return
//}
// If the proof for the current post window has been generated, check the // If the proof for the current post window has been generated, check the
// next post window // next post window
@ -246,17 +249,19 @@ func (p *proveHandler2) processHeadChange(ctx context.Context, newTS *types.TipS
// _, complete = p.posts.get(di) // _, complete = p.posts.get(di)
//} //}
// Check if the chain is above the Challenge height for the post window
if newTS.Height() < di.Challenge+ChallengeConfidence {
return
}
//p.current = &currentPost{di: di}
err := p.wdPostTask.AddTask(ctx, newTS, di) err := p.wdPostTask.AddTask(ctx, newTS, di)
if err != nil { if err != nil {
log.Errorf("AddTask failed: %v", err) log.Errorf("AddTask failed: %v", err)
} }
//// Check if the chain is above the Challenge height for the post window
//if newTS.Height() < di.Challenge+ChallengeConfidence {
// return
//}
// //
//p.current = &currentPost{di: di}
//curr := p.current //curr := p.current
//p.current.abort = p.api.startGeneratePoST(ctx, newTS, di, func(posts []miner.SubmitWindowedPoStParams, err error) { //p.current.abort = p.api.startGeneratePoST(ctx, newTS, di, func(posts []miner.SubmitWindowedPoStParams, err error) {
// p.postResults <- &postResult{ts: newTS, currPost: curr, posts: posts, err: err} // p.postResults <- &postResult{ts: newTS, currPost: curr, posts: posts, err: err}

View File

@ -270,6 +270,8 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, manual bool, di
start := time.Now() start := time.Now()
log.Errorf("runPoStCycle called with manual: %v, di: %v, ts: %v", manual, di, ts)
log := log.WithOptions(zap.Fields(zap.Time("cycle", start))) log := log.WithOptions(zap.Fields(zap.Time("cycle", start)))
log.Infow("starting PoSt cycle", "manual", manual, "ts", ts, "deadline", di.Index) log.Infow("starting PoSt cycle", "manual", manual, "ts", ts, "deadline", di.Index)
defer func() { defer func() {
@ -311,6 +313,7 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, manual bool, di
// allowed in a single message // allowed in a single message
partitionBatches, err := s.BatchPartitions(partitions, nv) partitionBatches, err := s.BatchPartitions(partitions, nv)
if err != nil { if err != nil {
log.Errorf("batch partitions failed: %+v", err)
return nil, err return nil, err
} }

View File

@ -37,6 +37,7 @@ var log = logging.Logger("wdpost")
type NodeAPI interface { type NodeAPI interface {
ChainHead(context.Context) (*types.TipSet, error) ChainHead(context.Context) (*types.TipSet, error)
ChainNotify(context.Context) (<-chan []*api.HeadChange, error) ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
@ -139,7 +140,7 @@ func NewWindowedPoStScheduler(api NodeAPI,
func (s *WindowPoStScheduler) Run(ctx context.Context) { func (s *WindowPoStScheduler) Run(ctx context.Context) {
// Initialize change handler. // Initialize change handler.
wdPostTask := NewWdPostTask(s.db) wdPostTask := NewWdPostTask(s.db, s)
taskEngine, er := harmonytask.New(s.db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300") taskEngine, er := harmonytask.New(s.db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
if er != nil { if er != nil {

View File

@ -6,6 +6,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"time"
) )
type WdPostTaskDetails struct { type WdPostTaskDetails struct {
@ -14,13 +15,69 @@ type WdPostTaskDetails struct {
} }
type WdPostTask struct { type WdPostTask struct {
tasks chan *WdPostTaskDetails tasks chan *WdPostTaskDetails
db *harmonydb.DB db *harmonydb.DB
scheduler *WindowPoStScheduler
} }
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
time.Sleep(5 * time.Second)
log.Errorf("WdPostTask.Do() called with taskID: %v", taskID) log.Errorf("WdPostTask.Do() called with taskID: %v", taskID)
var tsKeyBytes []byte
var deadline dline.Info
err = t.db.QueryRow(context.Background(),
`Select tskey,
current_epoch,
period_start,
index,
open,
close,
challenge,
fault_cutoff,
wpost_period_deadlines,
wpost_proving_period,
wpost_challenge_window,
wpost_challenge_lookback,
fault_declaration_cutoff
from wdpost_tasks
where task_id = $1`, taskID).Scan(
&tsKeyBytes,
&deadline.CurrentEpoch,
&deadline.PeriodStart,
&deadline.Index,
&deadline.Open,
&deadline.Close,
&deadline.Challenge,
&deadline.FaultCutoff,
&deadline.WPoStPeriodDeadlines,
&deadline.WPoStProvingPeriod,
&deadline.WPoStChallengeWindow,
&deadline.WPoStChallengeLookback,
&deadline.FaultDeclarationCutoff,
)
if err != nil {
log.Errorf("WdPostTask.Do() failed to queryRow: %v", err)
return false, err
}
log.Errorf("tskEY: %v", tsKeyBytes)
tsKey, err := types.TipSetKeyFromBytes(tsKeyBytes)
ts, err := t.scheduler.api.ChainGetTipSet(context.Background(), tsKey)
if err != nil {
log.Errorf("WdPostTask.Do() failed to get tipset: %v", err)
return false, err
}
submitWdPostParams, err := t.scheduler.runPoStCycle(context.Background(), false, deadline, ts)
if err != nil {
log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err)
return false, err
}
log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams)
return true, nil return true, nil
} }
@ -31,6 +88,7 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID) (*harmonytask.TaskID, e
func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails { func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{ return harmonytask.TaskTypeDetails{
Name: "WdPostCompute", Name: "WdPostCompute",
Max: -1,
} }
} }
@ -49,10 +107,11 @@ func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
} }
} }
func NewWdPostTask(db *harmonydb.DB) *WdPostTask { func NewWdPostTask(db *harmonydb.DB, scheduler *WindowPoStScheduler) *WdPostTask {
return &WdPostTask{ return &WdPostTask{
tasks: make(chan *WdPostTaskDetails, 2), tasks: make(chan *WdPostTaskDetails, 2),
db: db, db: db,
scheduler: scheduler,
} }
} }
@ -92,7 +151,7 @@ func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId
fault_declaration_cutoff fault_declaration_cutoff
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`, ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`,
taskId, taskId,
tsKey.String(), tsKey.Bytes(),
deadline.CurrentEpoch, deadline.CurrentEpoch,
deadline.PeriodStart, deadline.PeriodStart,
deadline.Index, deadline.Index,
@ -134,7 +193,7 @@ func (t *WdPostTask) AddTaskOld(ctx context.Context, ts *types.TipSet, deadline
fault_declaration_cutoff fault_declaration_cutoff
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`, ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`,
taskId, taskId,
tsKey.String(), tsKey.Bytes(),
deadline.CurrentEpoch, deadline.CurrentEpoch,
deadline.PeriodStart, deadline.PeriodStart,
deadline.Index, deadline.Index,