This commit is contained in:
Shrenuj Bansal 2023-08-30 12:11:09 -04:00
parent eacd5bb970
commit c0bd13eced
3 changed files with 41 additions and 19 deletions

View File

@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
@ -322,6 +323,18 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func
return nil, err return nil, err
} }
wdPostTask := wdpost.NewWdPostTask(db)
tasks := []harmonytask.TaskInterface{wdPostTask}
taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
if err != nil {
return nil, xerrors.Errorf("failed to create task engine: %w", err)
}
//handler := gin.New()
//
//taskEngine.ApplyHttpHandlers(handler.Group("/"))
defer taskEngine.GracefullyTerminate(time.Hour)
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(context.Context) error { OnStart: func(context.Context) error {
go fps.Run(ctx) go fps.Run(ctx)

View File

@ -2,7 +2,6 @@ package wdpost
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"sync" "sync"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -32,18 +31,25 @@ type wdPoStCommands interface {
recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info) recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
} }
type changeHandlerIface interface {
start()
update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error
shutdown()
currentTSDI() (*types.TipSet, *dline.Info)
}
var _ changeHandlerIface = &changeHandler{}
type changeHandler struct { type changeHandler struct {
api wdPoStCommands api wdPoStCommands
actor address.Address actor address.Address
proveHdlr *proveHandler proveHdlr *proveHandler
submitHdlr *submitHandler submitHdlr *submitHandler
db *harmonydb.DB
} }
func newChangeHandler(api wdPoStCommands, actor address.Address, db *harmonydb.DB) *changeHandler { func newChangeHandler(api wdPoStCommands, actor address.Address) *changeHandler {
posts := newPostsCache() posts := newPostsCache()
p := newProver(api, posts, db) p := newProver(api, posts)
s := newSubmitter(api, posts) s := newSubmitter(api, posts)
return &changeHandler{api: api, actor: actor, proveHdlr: p, submitHdlr: s} return &changeHandler{api: api, actor: actor, proveHdlr: p, submitHdlr: s}
} }
@ -172,7 +178,6 @@ type proveHandler struct {
func newProver( func newProver(
api wdPoStCommands, api wdPoStCommands,
posts *postsCache, posts *postsCache,
db *harmonydb.DB,
) *proveHandler { ) *proveHandler {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &proveHandler{ return &proveHandler{
@ -182,7 +187,6 @@ func newProver(
hcs: make(chan *headChange), hcs: make(chan *headChange),
shutdownCtx: ctx, shutdownCtx: ctx,
shutdown: cancel, shutdown: cancel,
wdPostTask: NewWdPostTask(db),
} }
} }

View File

@ -77,8 +77,8 @@ type WindowPoStScheduler struct {
maxPartitionsPerPostMessage int maxPartitionsPerPostMessage int
maxPartitionsPerRecoveryMessage int maxPartitionsPerRecoveryMessage int
singleRecoveringPartitionPerPostMessage bool singleRecoveringPartitionPerPostMessage bool
ch *changeHandler ch changeHandlerIface
ch2 *changeHandler2 //ch2 *changeHandler2
actor address.Address actor address.Address
@ -140,13 +140,17 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
*WindowPoStScheduler *WindowPoStScheduler
}{s.api, s} }{s.api, s}
s.ch = newChangeHandler(callbacks, s.actor, s.db) run_on_lotus_provider := true
defer s.ch.shutdown()
s.ch.start()
s.ch2 = newChangeHandler2(callbacks, s.actor, s.db) if !run_on_lotus_provider {
defer s.ch2.shutdown() s.ch = newChangeHandler(callbacks, s.actor)
s.ch2.start() defer s.ch.shutdown()
s.ch.start()
} else {
s.ch = newChangeHandler2(callbacks, s.actor, s.db)
defer s.ch.shutdown()
s.ch.start()
}
var ( var (
notifs <-chan []*api.HeadChange notifs <-chan []*api.HeadChange
@ -223,6 +227,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) {
} }
func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.TipSet) { func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.TipSet) {
log.Errorf("WindowPoStScheduler.update() called with revert: %v, apply: %v", revert, apply)
if apply == nil { if apply == nil {
log.Error("no new tipset in window post WindowPoStScheduler.update") log.Error("no new tipset in window post WindowPoStScheduler.update")
return return
@ -232,10 +237,10 @@ func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.T
log.Errorf("handling head updates in window post sched: %+v", err) log.Errorf("handling head updates in window post sched: %+v", err)
} }
err = s.ch2.update(ctx, revert, apply) //err = s.ch2.update(ctx, revert, apply)
if err != nil { //if err != nil {
log.Errorf("handling head updates in window post sched: %+v", err) // log.Errorf("handling head updates in window post sched: %+v", err)
} //}
} }
// onAbort is called when generating proofs or submitting proofs is aborted // onAbort is called when generating proofs or submitting proofs is aborted