Merge branch 'feat/wdpost-adder' into wdpost-can-accept

This commit is contained in:
Andrew Jackson (Ajax) 2023-10-06 11:48:04 -05:00
commit 598e9b931d
13 changed files with 925 additions and 25 deletions

View File

@ -38,7 +38,7 @@ type TipSetKey struct {
// self-describing, wrapped as a string.
// These gymnastics make the a TipSetKey usable as a map key.
// The empty key has value "".
value string
Value string
}
// NewTipSetKey builds a new key from a slice of CIDs.
@ -59,7 +59,7 @@ func TipSetKeyFromBytes(encoded []byte) (TipSetKey, error) {
// Cids returns a slice of the CIDs comprising this key.
func (k TipSetKey) Cids() []cid.Cid {
cids, err := decodeKey([]byte(k.value))
cids, err := decodeKey([]byte(k.Value))
if err != nil {
panic("invalid tipset key: " + err.Error())
}
@ -83,7 +83,7 @@ func (k TipSetKey) String() string {
// Bytes() returns a binary representation of the key.
func (k TipSetKey) Bytes() []byte {
return []byte(k.value)
return []byte(k.Value)
}
func (k TipSetKey) MarshalJSON() ([]byte, error) {
@ -95,7 +95,7 @@ func (k *TipSetKey) UnmarshalJSON(b []byte) error {
if err := json.Unmarshal(b, &cids); err != nil {
return err
}
k.value = string(encodeKey(cids))
k.Value = string(encodeKey(cids))
return nil
}
@ -161,7 +161,7 @@ func (k *TipSetKey) UnmarshalCBOR(reader io.Reader) error {
}
func (k TipSetKey) IsEmpty() bool {
return len(k.value) == 0
return len(k.Value) == 0
}
func encodeKey(cids []cid.Cid) []byte {

View File

@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"github.com/filecoin-project/lotus/storage/wdpost"
"net"
"os"
"strings"
@ -176,7 +177,9 @@ var runCmd = &cli.Command{
}
_ = lp // here is where the config feeds into task runners
taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{}, address)
wdPostTask := wdpost.NewWdPostTask(db, nil)
taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, address)
if err != nil {
return err
}

View File

@ -18,7 +18,7 @@ CREATE TABLE harmony_task (
owner_id INTEGER REFERENCES harmony_machines (id) ON DELETE SET NULL,
added_by INTEGER NOT NULL,
previous_task INTEGER,
name varchar(8) NOT NULL
name varchar(16) NOT NULL
);
COMMENT ON COLUMN harmony_task.initiated_by IS 'The task ID whose completion occasioned this task.';
COMMENT ON COLUMN harmony_task.owner_id IS 'The foreign key to harmony_machines.';
@ -29,7 +29,7 @@ COMMENT ON COLUMN harmony_task.update_time IS 'When it was last modified. not a
CREATE TABLE harmony_task_history (
id SERIAL PRIMARY KEY NOT NULL,
task_id INTEGER NOT NULL,
name VARCHAR(8) NOT NULL,
name VARCHAR(16) NOT NULL,
posted TIMESTAMP NOT NULL,
work_start TIMESTAMP NOT NULL,
work_end TIMESTAMP NOT NULL,
@ -41,12 +41,12 @@ COMMENT ON COLUMN harmony_task_history.result IS 'Use to detemine if this was a
CREATE TABLE harmony_task_follow (
id SERIAL PRIMARY KEY NOT NULL,
owner_id INTEGER NOT NULL REFERENCES harmony_machines (id) ON DELETE CASCADE,
to_type VARCHAR(8) NOT NULL,
from_type VARCHAR(8) NOT NULL
to_type VARCHAR(16) NOT NULL,
from_type VARCHAR(16) NOT NULL
);
CREATE TABLE harmony_task_impl (
id SERIAL PRIMARY KEY NOT NULL,
owner_id INTEGER NOT NULL REFERENCES harmony_machines (id) ON DELETE CASCADE,
name VARCHAR(8) NOT NULL
name VARCHAR(16) NOT NULL
);

View File

@ -0,0 +1,23 @@
create table wdpost_tasks
(
task_id int not null
constraint wdpost_tasks_pkey
primary key,
tskey bytea not null,
current_epoch bigint not null,
period_start bigint not null,
index bigint not null
constraint wdpost_tasks_index_key
unique,
open bigint not null,
close bigint not null,
challenge bigint not null,
fault_cutoff bigint,
wpost_period_deadlines bigint,
wpost_proving_period bigint,
wpost_challenge_window bigint,
wpost_challenge_lookback bigint,
fault_declaration_cutoff bigint
);

View File

@ -54,7 +54,7 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
if err != nil {
return nil, fmt.Errorf("could not read from harmony_machines: %w", err)
}
gpuram := lo.Sum(reg.GpuRam)
gpuram := uint64(lo.Sum(reg.GpuRam))
if len(ownerID) == 0 {
err = db.QueryRow(ctx, `INSERT INTO harmony_machines
(host_and_port, cpu, ram, gpu, gpuram) VALUES

View File

@ -103,6 +103,10 @@ func ConfigStorageMiner(c interface{}) Option {
If(cfg.Subsystems.EnableSealing, Error(xerrors.Errorf("sealing can only be enabled on a mining node"))),
If(cfg.Subsystems.EnableSectorStorage, Error(xerrors.Errorf("sealing can only be enabled on a mining node"))),
),
Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) {
return harmonydb.NewFromConfigWithITestID(cfg)(id)
}),
If(cfg.Subsystems.EnableMining,
If(!cfg.Subsystems.EnableSealing, Error(xerrors.Errorf("sealing can't be disabled on a mining node yet"))),
If(!cfg.Subsystems.EnableSectorStorage, Error(xerrors.Errorf("sealing can't be disabled on a mining node yet"))),
@ -126,10 +130,6 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(sectorblocks.SectorBuilder), From(new(*sealing.Sealing))),
),
Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) {
return harmonydb.NewFromConfigWithITestID(cfg)(id)
}),
If(cfg.Subsystems.EnableSectorStorage,
// Sector storage
Override(new(*paths.IndexProxy), paths.NewIndexProxyHelper(cfg.Subsystems.EnableSectorIndexDB)),

View File

@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"net/http"
"os"
"path/filepath"
@ -300,8 +301,8 @@ func SealingPipeline(fc config.MinerFeeConfig) func(params SealingPipelineParams
}
}
func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params SealingPipelineParams) (*wdpost.WindowPoStScheduler, error) {
return func(params SealingPipelineParams) (*wdpost.WindowPoStScheduler, error) {
func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params SealingPipelineParams, db *harmonydb.DB) (*wdpost.WindowPoStScheduler, error) {
return func(params SealingPipelineParams, db *harmonydb.DB) (*wdpost.WindowPoStScheduler, error) {
var (
mctx = params.MetricsCtx
lc = params.Lifecycle
@ -315,7 +316,18 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func
ctx := helpers.LifecycleCtx(mctx, lc)
fps, err := wdpost.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, maddr)
//wdPostTask := wdpost.NewWdPostTask(db)
//taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
//if err != nil {
// return nil, xerrors.Errorf("failed to create task engine: %w", err)
//}
////handler := gin.New()
////
////taskEngine.ApplyHttpHandlers(handler.Group("/"))
//defer taskEngine.GracefullyTerminate(time.Hour)
fps, err := wdpost.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, maddr, db, nil)
if err != nil {
return nil, err

View File

@ -31,6 +31,15 @@ type wdPoStCommands interface {
recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
}
type changeHandlerIface interface {
start()
update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error
shutdown()
currentTSDI() (*types.TipSet, *dline.Info)
}
var _ changeHandlerIface = &changeHandler{}
type changeHandler struct {
api wdPoStCommands
actor address.Address
@ -162,6 +171,8 @@ type proveHandler struct {
// Used for testing
processedHeadChanges chan *headChange
processedPostResults chan *postResult
wdPostTask *WdPostTask
}
func newProver(
@ -211,6 +222,8 @@ func (p *proveHandler) run() {
func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
// If the post window has expired, abort the current proof
//log.Errorf("--------------------WINDOW POST CHANGE HANDLER PROCESS HC----------------------")
if p.current != nil && newTS.Height() >= p.current.di.Close {
// Cancel the context on the current proof
p.current.abort()
@ -234,6 +247,11 @@ func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSe
_, complete = p.posts.get(di)
}
//err := p.wdPostTask.AddTask(ctx, newTS, di)
//if err != nil {
// log.Errorf("AddTask failed: %v", err)
//}
// Check if the chain is above the Challenge height for the post window
if newTS.Height() < di.Challenge+ChallengeConfidence {
return

View File

@ -0,0 +1,564 @@
package wdpost
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/chain/types"
)
//const (
// SubmitConfidence = 4
// ChallengeConfidence = 1
//)
//
//type CompleteGeneratePoSTCb func(posts []miner.SubmitWindowedPoStParams, err error)
//type CompleteSubmitPoSTCb func(err error)
//
//// wdPoStCommands is the subset of the WindowPoStScheduler + full node APIs used
//// by the changeHandler to execute actions and query state.
//type wdPoStCommands interface {
// StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
//
// startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc
// startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc
// onAbort(ts *types.TipSet, deadline *dline.Info)
// recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
//}
var _ changeHandlerIface = &changeHandler2{}
type changeHandler2 struct {
api wdPoStCommands
actor address.Address
proveHdlr *proveHandler2
//submitHdlr *submitHandler
db *harmonydb.DB
}
func (ch *changeHandler2) currentTSDI() (*types.TipSet, *dline.Info) {
//TODO implement me
panic("implement me")
}
func newChangeHandler2(api wdPoStCommands, actor address.Address, task *WdPostTask) *changeHandler2 {
log.Errorf("newChangeHandler2() called with api: %v, actor: %v", api, actor)
//posts := newPostsCache()
p := newProver2(api, task)
//s := newSubmitter(api, posts)
return &changeHandler2{api: api, actor: actor, proveHdlr: p}
}
func (ch *changeHandler2) start() {
go ch.proveHdlr.run()
//go ch.submitHdlr.run()
}
func (ch *changeHandler2) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error {
// Get the current deadline period
di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key())
if err != nil {
return err
}
if !di.PeriodStarted() {
return nil // not proving anything yet
}
hc := &headChange{
ctx: ctx,
revert: revert,
advance: advance,
di: di,
}
select {
case ch.proveHdlr.hcs <- hc:
case <-ch.proveHdlr.shutdownCtx.Done():
case <-ctx.Done():
}
//select {
//case ch.submitHdlr.hcs <- hc:
//case <-ch.submitHdlr.shutdownCtx.Done():
//case <-ctx.Done():
//}
return nil
}
func (ch *changeHandler2) shutdown() {
ch.proveHdlr.shutdown()
//ch.submitHdlr.shutdown()
}
//func (ch *changeHandler2) currentTSDI() (*types.TipSet, *dline.Info) {
// return ch.submitHdlr.currentTSDI()
//}
// postsCache keeps a cache of PoSTs for each proving window
//type postsCache struct {
// added chan *postInfo
// lk sync.RWMutex
// cache map[abi.ChainEpoch][]miner.SubmitWindowedPoStParams
//}
//func newPostsCache() *postsCache {
// return &postsCache{
// added: make(chan *postInfo, 16),
// cache: make(map[abi.ChainEpoch][]miner.SubmitWindowedPoStParams),
// }
//}
//func (c *postsCache) add(di *dline.Info, posts []miner.SubmitWindowedPoStParams) {
// c.lk.Lock()
// defer c.lk.Unlock()
//
// // TODO: clear cache entries older than chain finality
// c.cache[di.Open] = posts
//
// c.added <- &postInfo{
// di: di,
// posts: posts,
// }
//}
//
//func (c *postsCache) get(di *dline.Info) ([]miner.SubmitWindowedPoStParams, bool) {
// c.lk.RLock()
// defer c.lk.RUnlock()
//
// posts, ok := c.cache[di.Open]
// return posts, ok
//}
//type headChange struct {
// ctx context.Context
// revert *types.TipSet
// advance *types.TipSet
// di *dline.Info
//}
//
//type currentPost struct {
// di *dline.Info
// abort context.CancelFunc
//}
//
//type postResult struct {
// ts *types.TipSet
// currPost *currentPost
// posts []miner.SubmitWindowedPoStParams
// err error
//}
// proveHandler generates proofs
type proveHandler2 struct {
api wdPoStCommands
//posts *postsCache
//postResults chan *postResult
hcs chan *headChange
current *currentPost
shutdownCtx context.Context
shutdown context.CancelFunc
// Used for testing
processedHeadChanges chan *headChange
processedPostResults chan *postResult
wdPostTask *WdPostTask
currDeadline *dline.Info
}
func newProver2(
api wdPoStCommands,
//posts *postsCache,
//db *harmonydb.DB,
wdPostTask *WdPostTask,
) *proveHandler2 {
ctx, cancel := context.WithCancel(context.Background())
return &proveHandler2{
api: api,
//posts: posts,
//postResults: make(chan *postResult),
hcs: make(chan *headChange),
shutdownCtx: ctx,
shutdown: cancel,
wdPostTask: wdPostTask,
}
}
func (p *proveHandler2) run() {
// Abort proving on shutdown
defer func() {
if p.current != nil {
p.current.abort()
}
}()
for p.shutdownCtx.Err() == nil {
select {
case <-p.shutdownCtx.Done():
return
case hc := <-p.hcs:
// Head changed
p.processHeadChange(hc.ctx, hc.advance, hc.di)
if p.processedHeadChanges != nil {
p.processedHeadChanges <- hc
}
//case res := <-p.postResults:
// // Proof generation complete
// p.processPostResult(res)
// if p.processedPostResults != nil {
// p.processedPostResults <- res
// }
}
}
}
func (p *proveHandler2) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
// If the post window has expired, abort the current proof
//if p.current != nil && newTS.Height() >= p.current.di.Close {
// log.Errorf("Aborted window post Proving (Deadline: %+v), newTs: %+v", p.current.di, newTS.Height())
// // Cancel the context on the current proof
// p.current.abort()
//
// // Clear out the reference to the proof so that we can immediately
// // start generating a new proof, without having to worry about state
// // getting clobbered when the abort completes
// p.current = nil
//}
//
//// Only generate one proof at a time
//log.Errorf("p.current: %+v", p.current)
//if p.current != nil {
// return
//}
// If the proof for the current post window has been generated, check the
// next post window
//_, complete := p.posts.get(di)
//for complete {
// di = nextDeadline(di)
// _, complete = p.posts.get(di)
//}
// Check if the chain is above the Challenge height for the post window
if newTS.Height() < di.Challenge+ChallengeConfidence {
return
}
//p.current = &currentPost{di: di}
err := p.wdPostTask.AddTask(ctx, newTS, di)
if err != nil {
log.Errorf("AddTask failed: %v", err)
}
//
//curr := p.current
//p.current.abort = p.api.startGeneratePoST(ctx, newTS, di, func(posts []miner.SubmitWindowedPoStParams, err error) {
// p.postResults <- &postResult{ts: newTS, currPost: curr, posts: posts, err: err}
//})
}
//func (p *proveHandler) processPostResult(res *postResult) {
// di := res.currPost.di
// if res.err != nil {
// // Proving failed so inform the API
// p.api.recordPoStFailure(res.err, res.ts, di)
// log.Warnf("Aborted window post Proving (Deadline: %+v)", di)
// p.api.onAbort(res.ts, di)
//
// // Check if the current post has already been aborted
// if p.current == res.currPost {
// // If the current post was not already aborted, setting it to nil
// // marks it as complete so that a new post can be started
// p.current = nil
// }
// return
// }
//
// // Completed processing this proving window
// p.current = nil
//
// // Add the proofs to the cache
// p.posts.add(di, res.posts)
//}
//
//type submitResult struct {
// pw *postWindow
// err error
//}
//
//type SubmitState string
//
//const (
// SubmitStateStart SubmitState = "SubmitStateStart"
// SubmitStateSubmitting SubmitState = "SubmitStateSubmitting"
// SubmitStateComplete SubmitState = "SubmitStateComplete"
//)
//
//type postWindow struct {
// ts *types.TipSet
// di *dline.Info
// submitState SubmitState
// abort context.CancelFunc
//}
//
//type postInfo struct {
// di *dline.Info
// posts []miner.SubmitWindowedPoStParams
//}
//
//// submitHandler submits proofs on-chain
//type submitHandler struct {
// api wdPoStCommands
// posts *postsCache
//
// submitResults chan *submitResult
// hcs chan *headChange
//
// postWindows map[abi.ChainEpoch]*postWindow
// getPostWindowReqs chan *getPWReq
//
// shutdownCtx context.Context
// shutdown context.CancelFunc
//
// currentCtx context.Context
// currentTS *types.TipSet
// currentDI *dline.Info
// getTSDIReq chan chan *tsdi
//
// // Used for testing
// processedHeadChanges chan *headChange
// processedSubmitResults chan *submitResult
// processedPostReady chan *postInfo
//}
//
//func newSubmitter(
// api wdPoStCommands,
// posts *postsCache,
//) *submitHandler {
// ctx, cancel := context.WithCancel(context.Background())
// return &submitHandler{
// api: api,
// posts: posts,
// submitResults: make(chan *submitResult),
// hcs: make(chan *headChange),
// postWindows: make(map[abi.ChainEpoch]*postWindow),
// getPostWindowReqs: make(chan *getPWReq),
// getTSDIReq: make(chan chan *tsdi),
// shutdownCtx: ctx,
// shutdown: cancel,
// }
//}
//
//func (s *submitHandler) run() {
// // On shutdown, abort in-progress submits
// defer func() {
// for _, pw := range s.postWindows {
// if pw.abort != nil {
// pw.abort()
// }
// }
// }()
//
// for s.shutdownCtx.Err() == nil {
// select {
// case <-s.shutdownCtx.Done():
// return
//
// case hc := <-s.hcs:
// // Head change
// s.processHeadChange(hc.ctx, hc.revert, hc.advance, hc.di)
// if s.processedHeadChanges != nil {
// s.processedHeadChanges <- hc
// }
//
// case pi := <-s.posts.added:
// // Proof generated
// s.processPostReady(pi)
// if s.processedPostReady != nil {
// s.processedPostReady <- pi
// }
//
// case res := <-s.submitResults:
// // Submit complete
// s.processSubmitResult(res)
// if s.processedSubmitResults != nil {
// s.processedSubmitResults <- res
// }
//
// case pwreq := <-s.getPostWindowReqs:
// // used by getPostWindow() to sync with run loop
// pwreq.out <- s.postWindows[pwreq.di.Open]
//
// case out := <-s.getTSDIReq:
// // used by currentTSDI() to sync with run loop
// out <- &tsdi{ts: s.currentTS, di: s.currentDI}
// }
// }
//}
//
//// processHeadChange is called when the chain head changes
//func (s *submitHandler) processHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, di *dline.Info) {
// s.currentCtx = ctx
// s.currentTS = advance
// s.currentDI = di
//
// // Start tracking the current post window if we're not already
// // TODO: clear post windows older than chain finality
// if _, ok := s.postWindows[di.Open]; !ok {
// s.postWindows[di.Open] = &postWindow{
// di: di,
// ts: advance,
// submitState: SubmitStateStart,
// }
// }
//
// // Apply the change to all post windows
// for _, pw := range s.postWindows {
// s.processHeadChangeForPW(ctx, revert, advance, pw)
// }
//}
//
//func (s *submitHandler) processHeadChangeForPW(ctx context.Context, revert *types.TipSet, advance *types.TipSet, pw *postWindow) {
// revertedToPrevDL := revert != nil && revert.Height() < pw.di.Open
// expired := advance.Height() >= pw.di.Close
//
// // If the chain was reverted back to the previous deadline, or if the post
// // window has expired, abort submit
// if pw.submitState == SubmitStateSubmitting && (revertedToPrevDL || expired) {
// // Replace the aborted postWindow with a new one so that we can
// // submit again at any time without the state getting clobbered
// // when the abort completes
// abort := pw.abort
// if abort != nil {
// pw = &postWindow{
// di: pw.di,
// ts: advance,
// submitState: SubmitStateStart,
// }
// s.postWindows[pw.di.Open] = pw
//
// // Abort the current submit
// abort()
// }
// } else if pw.submitState == SubmitStateComplete && revertedToPrevDL {
// // If submit for this deadline has completed, but the chain was
// // reverted back to the previous deadline, reset the submit state to the
// // starting state, so that it can be resubmitted
// pw.submitState = SubmitStateStart
// }
//
// // Submit the proof to chain if the proof has been generated and the chain
// // height is above confidence
// s.submitIfReady(ctx, advance, pw)
//}
//
//// processPostReady is called when a proof generation completes
//func (s *submitHandler) processPostReady(pi *postInfo) {
// pw, ok := s.postWindows[pi.di.Open]
// if ok {
// s.submitIfReady(s.currentCtx, s.currentTS, pw)
// }
//}
//
//// submitIfReady submits a proof if the chain is high enough and the proof
//// has been generated for this deadline
//func (s *submitHandler) submitIfReady(ctx context.Context, advance *types.TipSet, pw *postWindow) {
// // If the window has expired, there's nothing more to do.
// if advance.Height() >= pw.di.Close {
// return
// }
//
// // Check if we're already submitting, or already completed submit
// if pw.submitState != SubmitStateStart {
// return
// }
//
// // Check if we've reached the confidence height to submit
// if advance.Height() < pw.di.Open+SubmitConfidence {
// return
// }
//
// // Check if the proofs have been generated for this deadline
// posts, ok := s.posts.get(pw.di)
// if !ok {
// return
// }
//
// // If there was nothing to prove, move straight to the complete state
// if len(posts) == 0 {
// pw.submitState = SubmitStateComplete
// return
// }
//
// // Start submitting post
// pw.submitState = SubmitStateSubmitting
// pw.abort = s.api.startSubmitPoST(ctx, advance, pw.di, posts, func(err error) {
// s.submitResults <- &submitResult{pw: pw, err: err}
// })
//}
//
//// processSubmitResult is called with the response to a submit
//func (s *submitHandler) processSubmitResult(res *submitResult) {
// if res.err != nil {
// // Submit failed so inform the API and go back to the start state
// s.api.recordPoStFailure(res.err, res.pw.ts, res.pw.di)
// log.Warnf("Aborted window post Submitting (Deadline: %+v)", res.pw.di)
// s.api.onAbort(res.pw.ts, res.pw.di)
//
// res.pw.submitState = SubmitStateStart
// return
// }
//
// // Submit succeeded so move to complete state
// res.pw.submitState = SubmitStateComplete
//}
//
//type tsdi struct {
// ts *types.TipSet
// di *dline.Info
//}
//
//func (s *submitHandler) currentTSDI() (*types.TipSet, *dline.Info) {
// out := make(chan *tsdi)
// s.getTSDIReq <- out
// res := <-out
// return res.ts, res.di
//}
//
//type getPWReq struct {
// di *dline.Info
// out chan *postWindow
//}
//
//func (s *submitHandler) getPostWindow(di *dline.Info) *postWindow {
// out := make(chan *postWindow)
// s.getPostWindowReqs <- &getPWReq{di: di, out: out}
// return <-out
//}
//
//// nextDeadline gets deadline info for the subsequent deadline
//func nextDeadline(currentDeadline *dline.Info) *dline.Info {
// periodStart := currentDeadline.PeriodStart
// newDeadline := currentDeadline.Index + 1
// if newDeadline == miner.WPoStPeriodDeadlines {
// newDeadline = 0
// periodStart = periodStart + miner.WPoStProvingPeriod
// }
//
// return NewDeadlineInfo(periodStart, newDeadline, currentDeadline.CurrentEpoch)
//}
//
//func NewDeadlineInfo(periodStart abi.ChainEpoch, deadlineIdx uint64, currEpoch abi.ChainEpoch) *dline.Info {
// return dline.NewInfo(periodStart, deadlineIdx, currEpoch, miner.WPoStPeriodDeadlines, miner.WPoStProvingPeriod, miner.WPoStChallengeWindow, miner.WPoStChallengeLookback, miner.FaultDeclarationCutoff)
//}

View File

@ -272,6 +272,8 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, manual bool, di
start := time.Now()
log.Errorf("runPoStCycle called with manual: %v, di: %v, ts: %v", manual, di, ts)
log := log.WithOptions(zap.Fields(zap.Time("cycle", start)))
log.Infow("starting PoSt cycle", "manual", manual, "ts", ts, "deadline", di.Index)
defer func() {
@ -313,6 +315,7 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, manual bool, di
// allowed in a single message
partitionBatches, err := s.BatchPartitions(partitions, nv)
if err != nil {
log.Errorf("batch partitions failed: %+v", err)
return nil, err
}

View File

@ -2,6 +2,9 @@ package wdpost
import (
"context"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/gin-gonic/gin"
"time"
"github.com/ipfs/go-cid"
@ -34,6 +37,7 @@ var log = logging.Logger("wdpost")
type NodeAPI interface {
ChainHead(context.Context) (*types.TipSet, error)
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
@ -76,7 +80,8 @@ type WindowPoStScheduler struct {
maxPartitionsPerPostMessage int
maxPartitionsPerRecoveryMessage int
singleRecoveringPartitionPerPostMessage bool
ch *changeHandler
ch changeHandlerIface
//ch2 *changeHandler2
actor address.Address
@ -85,6 +90,8 @@ type WindowPoStScheduler struct {
// failed abi.ChainEpoch // eps
// failLk sync.Mutex
db *harmonydb.DB
wdPostTask *WdPostTask
}
// NewWindowedPoStScheduler creates a new WindowPoStScheduler scheduler.
@ -96,7 +103,9 @@ func NewWindowedPoStScheduler(api NodeAPI,
verif storiface.Verifier,
ft sealer.FaultTracker,
j journal.Journal,
actor address.Address) (*WindowPoStScheduler, error) {
actor address.Address,
db *harmonydb.DB,
task *WdPostTask) (*WindowPoStScheduler, error) {
mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting sector size: %w", err)
@ -122,22 +131,44 @@ func NewWindowedPoStScheduler(api NodeAPI,
evtTypeWdPoStRecoveries: j.RegisterEventType("wdpost", "recoveries_processed"),
evtTypeWdPoStFaults: j.RegisterEventType("wdpost", "faults_processed"),
},
journal: j,
journal: j,
wdPostTask: task,
db: db,
}, nil
}
func (s *WindowPoStScheduler) Run(ctx context.Context) {
// Initialize change handler.
wdPostTask := NewWdPostTask(s.db, s)
taskEngine, er := harmonytask.New(s.db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
if er != nil {
//return nil, xerrors.Errorf("failed to create task engine: %w", err)
log.Errorf("failed to create task engine: %w", er)
}
handler := gin.New()
taskEngine.ApplyHttpHandlers(handler.Group("/"))
defer taskEngine.GracefullyTerminate(time.Hour)
// callbacks is a union of the fullNodeFilteredAPI and ourselves.
callbacks := struct {
NodeAPI
*WindowPoStScheduler
}{s.api, s}
s.ch = newChangeHandler(callbacks, s.actor)
defer s.ch.shutdown()
s.ch.start()
run_on_lotus_provider := true
if !run_on_lotus_provider {
s.ch = newChangeHandler(callbacks, s.actor)
defer s.ch.shutdown()
s.ch.start()
} else {
s.ch = newChangeHandler2(callbacks, s.actor, wdPostTask)
defer s.ch.shutdown()
s.ch.start()
}
var (
notifs <-chan []*api.HeadChange
@ -222,6 +253,11 @@ func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.T
if err != nil {
log.Errorf("handling head updates in window post sched: %+v", err)
}
//err = s.ch2.update(ctx, revert, apply)
//if err != nil {
// log.Errorf("handling head updates in window post sched: %+v", err)
//}
}
// onAbort is called when generating proofs or submitting proofs is aborted

View File

@ -0,0 +1,217 @@
package wdpost
import (
"context"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"time"
)
type WdPostTaskDetails struct {
Ts *types.TipSet
Deadline *dline.Info
}
type WdPostTask struct {
tasks chan *WdPostTaskDetails
db *harmonydb.DB
scheduler *WindowPoStScheduler
}
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
time.Sleep(5 * time.Second)
log.Errorf("WdPostTask.Do() called with taskID: %v", taskID)
var tsKeyBytes []byte
var deadline dline.Info
err = t.db.QueryRow(context.Background(),
`Select tskey,
current_epoch,
period_start,
index,
open,
close,
challenge,
fault_cutoff,
wpost_period_deadlines,
wpost_proving_period,
wpost_challenge_window,
wpost_challenge_lookback,
fault_declaration_cutoff
from wdpost_tasks
where task_id = $1`, taskID).Scan(
&tsKeyBytes,
&deadline.CurrentEpoch,
&deadline.PeriodStart,
&deadline.Index,
&deadline.Open,
&deadline.Close,
&deadline.Challenge,
&deadline.FaultCutoff,
&deadline.WPoStPeriodDeadlines,
&deadline.WPoStProvingPeriod,
&deadline.WPoStChallengeWindow,
&deadline.WPoStChallengeLookback,
&deadline.FaultDeclarationCutoff,
)
if err != nil {
log.Errorf("WdPostTask.Do() failed to queryRow: %v", err)
return false, err
}
log.Errorf("tskEY: %v", tsKeyBytes)
tsKey, err := types.TipSetKeyFromBytes(tsKeyBytes)
ts, err := t.scheduler.api.ChainGetTipSet(context.Background(), tsKey)
if err != nil {
log.Errorf("WdPostTask.Do() failed to get tipset: %v", err)
return false, err
}
submitWdPostParams, err := t.scheduler.runPoStCycle(context.Background(), false, deadline, ts)
if err != nil {
log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err)
return false, err
}
log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams)
return true, nil
}
func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID) (*harmonytask.TaskID, error) {
return &ids[0], nil
}
func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: "WdPostCompute",
Max: -1,
}
}
func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
log.Errorf("WdPostTask.Adder() called ----------------------------- ")
// wait for any channels on t.tasks and call taskFunc on them
for taskDetails := range t.tasks {
log.Errorf("WdPostTask.Adder() received taskDetails: %v", taskDetails)
taskFunc(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
return t.addTaskToDB(taskDetails.Ts, taskDetails.Deadline, tID, tx)
})
}
}
func NewWdPostTask(db *harmonydb.DB, scheduler *WindowPoStScheduler) *WdPostTask {
return &WdPostTask{
tasks: make(chan *WdPostTaskDetails, 2),
db: db,
scheduler: scheduler,
}
}
func (t *WdPostTask) AddTask(ctx context.Context, ts *types.TipSet, deadline *dline.Info) error {
t.tasks <- &WdPostTaskDetails{
Ts: ts,
Deadline: deadline,
}
log.Errorf("WdPostTask.AddTask() called with ts: %v, deadline: %v, taskList: %v", ts, deadline, t.tasks)
return nil
}
func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
tsKey := ts.Key()
log.Errorf("WdPostTask.addTaskToDB() called with tsKey: %v, taskId: %v", tsKey, taskId)
_, err := tx.Exec(
`INSERT INTO wdpost_tasks (
task_id,
tskey,
current_epoch,
period_start,
index,
open,
close,
challenge,
fault_cutoff,
wpost_period_deadlines,
wpost_proving_period,
wpost_challenge_window,
wpost_challenge_lookback,
fault_declaration_cutoff
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`,
taskId,
tsKey.Bytes(),
deadline.CurrentEpoch,
deadline.PeriodStart,
deadline.Index,
deadline.Open,
deadline.Close,
deadline.Challenge,
deadline.FaultCutoff,
deadline.WPoStPeriodDeadlines,
deadline.WPoStProvingPeriod,
deadline.WPoStChallengeWindow,
deadline.WPoStChallengeLookback,
deadline.FaultDeclarationCutoff,
)
if err != nil {
return false, err
}
return true, nil
}
func (t *WdPostTask) AddTaskOld(ctx context.Context, ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID) error {
tsKey := ts.Key()
_, err := t.db.Exec(ctx,
`INSERT INTO wdpost_tasks (
task_id,
tskey,
current_epoch,
period_start,
index,
open,
close,
challenge,
fault_cutoff,
wpost_period_deadlines,
wpost_proving_period,
wpost_challenge_window,
wpost_challenge_lookback,
fault_declaration_cutoff
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`,
taskId,
tsKey.Bytes(),
deadline.CurrentEpoch,
deadline.PeriodStart,
deadline.Index,
deadline.Open,
deadline.Close,
deadline.Challenge,
deadline.FaultCutoff,
deadline.WPoStPeriodDeadlines,
deadline.WPoStProvingPeriod,
deadline.WPoStChallengeWindow,
deadline.WPoStChallengeLookback,
deadline.FaultDeclarationCutoff,
)
if err != nil {
return err
}
return nil
}
var _ harmonytask.TaskInterface = &WdPostTask{}

View File

@ -0,0 +1,24 @@
package wdpost
import (
"context"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/stretchr/testify/require"
"testing"
)
// test to create WDPostTask, invoke AddTask and check if the task is added to the DB
func TestAddTask(t *testing.T) {
db, err := harmonydb.New(nil, "yugabyte", "yugabyte", "yugabyte", "5433", "localhost", nil)
require.NoError(t, err)
wdPostTask := NewWdPostTask(db)
taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
ts := types.TipSet{}
deadline := dline.Info{}
err := wdPostTask.AddTask(context.Background(), &ts, &deadline)
require.NoError(t, err)
}