Merge branch 'feat/wdpost-adder' into wdpost-can-accept

This commit is contained in:
Andrew Jackson (Ajax) 2023-10-11 17:51:46 -05:00
parent 598e9b931d
commit 1f1e840e5c
16 changed files with 312 additions and 184 deletions

View File

@ -3,31 +3,39 @@ package main
import (
"context"
"fmt"
"github.com/filecoin-project/lotus/storage/wdpost"
"net"
"net/http"
"os"
"strings"
"time"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/ipfs/go-datastore/namespace"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/fsjournal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/ulimit"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@ -175,14 +183,72 @@ var runCmd = &cli.Command{
if err != nil {
return err
}
_ = lp // here is where the config feeds into task runners
// The config feeds into task runners & their helpers
wdPostTask := wdpost.NewWdPostTask(db, nil)
var activeTasks []harmonytask.TaskInterface
taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, address)
ds, dsCloser, err := modules.DatastoreV2(ctx, false, lr)
if err != nil {
return err
}
defer dsCloser()
maddr, err := modules.MinerAddress(ds)
if err != nil {
return err
}
var verif storiface.Verifier = ffiwrapper.ProofVerifier
as, err := modules.AddressSelector(&lp.Addresses)()
j, err := fsjournal.OpenFSJournal(lr, journal.EnvDisabledEvents()) // TODO switch this into DB entries.
if err != nil {
return err
}
defer j.Close()
full, fullCloser, err := cliutil.GetFullNodeAPIV1(cctx) // TODO switch this into DB entries.
if err != nil {
return err
}
defer fullCloser()
si := paths.NewIndexProxy( /*TODO Alerting*/ nil, db, true)
lstor, err := paths.NewLocal(ctx, lr, si, nil /*TODO URLs*/)
if err != nil {
return err
}
sa, err := modules.StorageAuth(ctx, full)
if err != nil {
return err
}
stor := paths.NewRemote(lstor, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
mds, err := lr.Datastore(ctx, "/metadata")
if err != nil {
return err
}
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix))
sealer, err := sealer.New(ctx, lstor, stor, lr, si, lp.SealerConfig, config.ProvingConfig{}, wsts, smsts)
if err != nil {
return err
}
if lp.Subsystems.EnableWindowPost {
wdPostTask, err := modules.WindowPostSchedulerV2(ctx, lp.Fees, lp.Proving, full, sealer, verif, j,
as, maddr, db, lp.Subsystems.WindowPostMaxTasks)
if err != nil {
return err
}
activeTasks = append(activeTasks, wdPostTask)
}
taskEngine, err := harmonytask.New(db, activeTasks, address)
if err != nil {
return err
}
handler := gin.New()
taskEngine.ApplyHttpHandlers(handler.Group("/"))
@ -222,6 +288,7 @@ var runCmd = &cli.Command{
//node.ShutdownHandler{Component: "provider", StopFunc: stop},
<-finishCh
return nil
},
}

View File

@ -45,7 +45,7 @@ func (t *task1) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, e
t.WorkCompleted = append(t.WorkCompleted, fmt.Sprintf("taskResult%d", t.myPersonalTable[tID]))
return true, nil
}
func (t *task1) CanAccept(list []harmonytask.TaskID) (*harmonytask.TaskID, error) {
func (t *task1) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return &list[0], nil
}
func (t *task1) TypeDetails() harmonytask.TaskTypeDetails {
@ -92,15 +92,15 @@ func TestHarmonyTasks(t *testing.T) {
type passthru struct {
dtl harmonytask.TaskTypeDetails
do func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error)
canAccept func(list []harmonytask.TaskID) (*harmonytask.TaskID, error)
canAccept func(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error)
adder func(add harmonytask.AddTaskFunc)
}
func (t *passthru) Do(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
return t.do(tID, stillOwned)
}
func (t *passthru) CanAccept(list []harmonytask.TaskID) (*harmonytask.TaskID, error) {
return t.canAccept(list)
func (t *passthru) CanAccept(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return t.canAccept(list, e)
}
func (t *passthru) TypeDetails() harmonytask.TaskTypeDetails {
return t.dtl
@ -117,8 +117,10 @@ var lettersMutex sync.Mutex
func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru {
return &passthru{
dtl: dtl,
canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return nil, nil },
dtl: dtl,
canAccept: func(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return nil, nil
},
adder: func(add harmonytask.AddTaskFunc) {
for _, vTmp := range []string{"A", "B"} {
v := vTmp
@ -133,8 +135,10 @@ func fooLetterAdder(t *testing.T, cdb *harmonydb.DB) *passthru {
}
func fooLetterSaver(t *testing.T, cdb *harmonydb.DB, dest *[]string) *passthru {
return &passthru{
dtl: dtl,
canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil },
dtl: dtl,
canAccept: func(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return &list[0], nil
},
do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
var content string
err = cdb.QueryRow(context.Background(),
@ -206,8 +210,10 @@ func TestTaskRetry(t *testing.T) {
alreadyFailed := map[string]bool{}
var dest []string
fails2xPerMsg := &passthru{
dtl: dtl,
canAccept: func(list []harmonytask.TaskID) (*harmonytask.TaskID, error) { return &list[0], nil },
dtl: dtl,
canAccept: func(list []harmonytask.TaskID, e *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
return &list[0], nil
},
do: func(tID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
var content string
err = cdb.QueryRow(context.Background(),

View File

@ -57,7 +57,7 @@ type TaskInterface interface {
// return null if the task type is not allowed on this machine.
// It should select the task it most wants to accomplish.
// It is also responsible for determining & reserving disk space (including scratch).
CanAccept([]TaskID) (*TaskID, error)
CanAccept([]TaskID, *TaskEngine) (*TaskID, error)
// TypeDetails() returns static details about how this task behaves and
// how this machine will run it. Read once at the beginning.

View File

@ -84,7 +84,7 @@ top:
}
// 3. What does the impl say?
tID, err := h.CanAccept(ids)
tID, err := h.CanAccept(ids, h.TaskEngine)
if err != nil {
log.Error(err)
return false

View File

@ -0,0 +1,19 @@
package taskhelp
// SubsetIf returns a subset of the slice for which the predicate is true.
// It does not allocate memory, but rearranges the list in place.
// A non-zero list input will always return a non-zero list.
// The return value is the subset and a boolean indicating whether the subset was sliced.
func SliceIfFound[T any](slice []T, f func(T) bool) ([]T, bool) {
ct := 0
for i, v := range slice {
if f(v) {
slice[ct], slice[i] = slice[i], slice[ct]
ct++
}
}
if ct == 0 {
return slice, false
}
return slice[:ct], true
}

View File

@ -1,15 +0,0 @@
package tasks
func SliceIfFound[T any](slice []T, f func(T) bool) []T {
ct := 0
for i, v := range slice {
if f(v) {
slice[ct], slice[i] = slice[i], slice[ct]
ct++
}
}
if ct == 0 {
return slice
}
return slice[:ct]
}

View File

@ -1,78 +0,0 @@
package wdpost
import (
"context"
"github.com/samber/lo"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
type WindowPostTaskHandler struct {
max int // TODO read from Flags
*harmonytask.TaskEngine // TODO populate at setup time
Chain full.ChainModuleAPI
}
func New(chain full.ChainModuleAPI) *WindowPostTaskHandler {
// TODO
return &WindowPostTaskHandler{
Chain: chain,
}
}
func (wp *WindowPostTaskHandler) CanAccept(tids []harmonytask.TaskID) (*harmonytask.TaskID, error) {
// GetEpoch
ts, err := wp.Chain.ChainHead(context.Background())
if err != nil {
return nil, err
}
// TODO GetDeadline Epochs for tasks
type wdTaskDef struct {
abi.RegisteredSealProof
}
var tasks []wdTaskDef
// TODO accept those past deadline, then do the right thing in Do()
// TODO Exit nil if no disk available?
// Discard those too big for our free RAM
freeRAM := wp.TaskEngine.ResourcesAvailable().Ram
tasks = lo.Filter(tasks, func(d wdTaskDef, _ int) bool {
return res[d.RegisteredSealProof].MaxMemory <= freeRAM
})
// TODO If Local Disk, discard others
// TODO If Shared Disk entries, discard others
// TODO Select the one closest to the deadline
// FUTURE: Be less greedy: let the best machine do the work.
// FUTURE: balance avoiding 2nd retries (3rd run)
return nil, nil
}
var res = storiface.ResourceTable[sealtasks.TTGenerateWindowPoSt]
func (wp *WindowPostTaskHandler) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: "WdPost",
Max: wp.max,
MaxFailures: 3,
Follows: nil,
Cost: resources.Resources{
Cpu: 1,
Gpu: 1,
// RAM of smallest proof's max is listed here
Ram: lo.Reduce(lo.Keys(res), func(i uint64, k abi.RegisteredSealProof, _ int) uint64 {
if res[k].MaxMemory < i {
return res[k].MaxMemory
}
return i
}, 1<<63),
},
}
}

View File

@ -369,7 +369,7 @@ func DefaultLotusProvider() *LotusProviderConfig {
MaxWindowPoStGasFee: types.MustParseFIL("5"),
MaxPublishDealsFee: types.MustParseFIL("0.05"),
},
Addresses: LotusProviderAddresses{
Addresses: MinerAddressConfig{
PreCommitControl: []string{},
CommitControl: []string{},
TerminateControl: []string{},

View File

@ -69,14 +69,17 @@ type StorageMiner struct {
type LotusProviderConfig struct {
Subsystems ProviderSubsystemsConfig
Fees LotusProviderFees
Addresses LotusProviderAddresses
Proving ProvingConfig
Fees LotusProviderFees
Addresses MinerAddressConfig
Proving ProvingConfig
SealingParams SealingConfig // TODO defaults
SealerConfig // TODO defaults
}
type ProviderSubsystemsConfig struct {
EnableWindowPost bool
EnableWinningPost bool
EnableWindowPost bool
WindowPostMaxTasks int
EnableWinningPost bool
}
type DAGStoreConfig struct {

View File

@ -57,3 +57,22 @@ func Datastore(disableLog bool) func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r
return bds, nil
}
}
func DatastoreV2(ctx context.Context, disableLog bool, r repo.LockedRepo) (dtypes.MetadataDS, func() error, error) {
mds, err := r.Datastore(ctx, "/metadata")
if err != nil {
return nil, nil, err
}
var logdir string
if !disableLog {
logdir = filepath.Join(r.Path(), "kvlog/metadata")
}
bds, err := backupds.Wrap(mds, logdir)
if err != nil {
return nil, nil, xerrors.Errorf("opening backupds: %w", err)
}
return bds, bds.CloseLog, nil
}

View File

@ -5,13 +5,14 @@ import (
"context"
"errors"
"fmt"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
@ -316,17 +317,6 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func
ctx := helpers.LifecycleCtx(mctx, lc)
//wdPostTask := wdpost.NewWdPostTask(db)
//taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
//if err != nil {
// return nil, xerrors.Errorf("failed to create task engine: %w", err)
//}
////handler := gin.New()
////
////taskEngine.ApplyHttpHandlers(handler.Group("/"))
//defer taskEngine.GracefullyTerminate(time.Hour)
fps, err := wdpost.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, maddr, db, nil)
if err != nil {
@ -344,6 +334,29 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func
}
}
func WindowPostSchedulerV2(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
api api.FullNode, sealer sealer.SectorManager, verif storiface.Verifier, j journal.Journal,
as *ctladdr.AddressSelector, maddr dtypes.MinerAddress, db *harmonydb.DB, max int) (*wdpost.WdPostTask, error) {
fc2 := config.MinerFeeConfig{
MaxPreCommitGasFee: fc.MaxPreCommitGasFee,
MaxCommitGasFee: fc.MaxCommitGasFee,
MaxTerminateGasFee: fc.MaxTerminateGasFee,
MaxPublishDealsFee: fc.MaxPublishDealsFee,
}
ts := wdpost.NewWdPostTask(db, nil, max)
fps, err := wdpost.NewWindowedPoStScheduler(api, fc2, pc, as, sealer, verif, sealer, j, address.Address(maddr), db, ts)
ts.Scheduler = fps
if err != nil {
return nil, err
}
go fps.RunV2(ctx, func(api wdpost.WdPoStCommands, actor address.Address) wdpost.ChangeHandlerIface {
return wdpost.NewChangeHandler2(api, actor, ts)
})
return ts, nil
}
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider, j journal.Journal) {
m.OnReady(marketevents.ReadyLogger("retrieval provider"))
lc.Append(fx.Hook{

View File

@ -20,9 +20,9 @@ const (
type CompleteGeneratePoSTCb func(posts []miner.SubmitWindowedPoStParams, err error)
type CompleteSubmitPoSTCb func(err error)
// wdPoStCommands is the subset of the WindowPoStScheduler + full node APIs used
// WdPoStCommands is the subset of the WindowPoStScheduler + full node APIs used
// by the changeHandler to execute actions and query state.
type wdPoStCommands interface {
type WdPoStCommands interface {
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc
@ -31,23 +31,23 @@ type wdPoStCommands interface {
recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
}
type changeHandlerIface interface {
type ChangeHandlerIface interface {
start()
update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error
shutdown()
currentTSDI() (*types.TipSet, *dline.Info)
}
var _ changeHandlerIface = &changeHandler{}
var _ ChangeHandlerIface = &changeHandler{}
type changeHandler struct {
api wdPoStCommands
api WdPoStCommands
actor address.Address
proveHdlr *proveHandler
submitHdlr *submitHandler
}
func newChangeHandler(api wdPoStCommands, actor address.Address) *changeHandler {
func newChangeHandler(api WdPoStCommands, actor address.Address) *changeHandler {
posts := newPostsCache()
p := newProver(api, posts)
s := newSubmitter(api, posts)
@ -157,7 +157,7 @@ type postResult struct {
// proveHandler generates proofs
type proveHandler struct {
api wdPoStCommands
api WdPoStCommands
posts *postsCache
postResults chan *postResult
@ -176,7 +176,7 @@ type proveHandler struct {
}
func newProver(
api wdPoStCommands,
api WdPoStCommands,
posts *postsCache,
) *proveHandler {
ctx, cancel := context.WithCancel(context.Background())
@ -315,7 +315,7 @@ type postInfo struct {
// submitHandler submits proofs on-chain
type submitHandler struct {
api wdPoStCommands
api WdPoStCommands
posts *postsCache
submitResults chan *submitResult
@ -339,7 +339,7 @@ type submitHandler struct {
}
func newSubmitter(
api wdPoStCommands,
api WdPoStCommands,
posts *postsCache,
) *submitHandler {
ctx, cancel := context.WithCancel(context.Background())

View File

@ -2,6 +2,7 @@ package wdpost
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
@ -28,10 +29,10 @@ import (
// recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
//}
var _ changeHandlerIface = &changeHandler2{}
var _ ChangeHandlerIface = &changeHandler2{}
type changeHandler2 struct {
api wdPoStCommands
api WdPoStCommands
actor address.Address
proveHdlr *proveHandler2
//submitHdlr *submitHandler
@ -44,8 +45,8 @@ func (ch *changeHandler2) currentTSDI() (*types.TipSet, *dline.Info) {
panic("implement me")
}
func newChangeHandler2(api wdPoStCommands, actor address.Address, task *WdPostTask) *changeHandler2 {
log.Errorf("newChangeHandler2() called with api: %v, actor: %v", api, actor)
func NewChangeHandler2(api WdPoStCommands, actor address.Address, task *WdPostTask) *changeHandler2 {
log.Errorf("NewChangeHandler2() called with api: %v, actor: %v", api, actor)
//posts := newPostsCache()
p := newProver2(api, task)
//s := newSubmitter(api, posts)
@ -155,7 +156,7 @@ func (ch *changeHandler2) shutdown() {
// proveHandler generates proofs
type proveHandler2 struct {
api wdPoStCommands
api WdPoStCommands
//posts *postsCache
//postResults chan *postResult
@ -175,7 +176,7 @@ type proveHandler2 struct {
}
func newProver2(
api wdPoStCommands,
api WdPoStCommands,
//posts *postsCache,
//db *harmonydb.DB,
wdPostTask *WdPostTask,

View File

@ -2,11 +2,10 @@ package wdpost
import (
"context"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/gin-gonic/gin"
"time"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"go.opencensus.io/trace"
@ -80,8 +79,7 @@ type WindowPoStScheduler struct {
maxPartitionsPerPostMessage int
maxPartitionsPerRecoveryMessage int
singleRecoveringPartitionPerPostMessage bool
ch changeHandlerIface
//ch2 *changeHandler2
ch ChangeHandlerIface
actor address.Address
@ -140,35 +138,20 @@ func NewWindowedPoStScheduler(api NodeAPI,
func (s *WindowPoStScheduler) Run(ctx context.Context) {
// Initialize change handler.
wdPostTask := NewWdPostTask(s.db, s)
taskEngine, er := harmonytask.New(s.db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
if er != nil {
//return nil, xerrors.Errorf("failed to create task engine: %w", err)
log.Errorf("failed to create task engine: %w", er)
}
handler := gin.New()
taskEngine.ApplyHttpHandlers(handler.Group("/"))
defer taskEngine.GracefullyTerminate(time.Hour)
s.RunV2(ctx, func(api WdPoStCommands, actor address.Address) ChangeHandlerIface {
return newChangeHandler(api, actor)
})
}
func (s *WindowPoStScheduler) RunV2(ctx context.Context, f func(api WdPoStCommands, actor address.Address) ChangeHandlerIface) {
// callbacks is a union of the fullNodeFilteredAPI and ourselves.
callbacks := struct {
NodeAPI
*WindowPoStScheduler
}{s.api, s}
run_on_lotus_provider := true
if !run_on_lotus_provider {
s.ch = newChangeHandler(callbacks, s.actor)
defer s.ch.shutdown()
s.ch.start()
} else {
s.ch = newChangeHandler2(callbacks, s.actor, wdPostTask)
defer s.ch.shutdown()
s.ch.start()
}
s.ch = f(callbacks, s.actor)
defer s.ch.shutdown()
s.ch.start()
var (
notifs <-chan []*api.HeadChange

View File

@ -2,11 +2,19 @@ package wdpost
import (
"context"
"sort"
"time"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"time"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/harmony/taskhelp"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/samber/lo"
)
type WdPostTaskDetails struct {
@ -17,7 +25,8 @@ type WdPostTaskDetails struct {
type WdPostTask struct {
tasks chan *WdPostTaskDetails
db *harmonydb.DB
scheduler *WindowPoStScheduler
Scheduler *WindowPoStScheduler
max int
}
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
@ -65,12 +74,27 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
log.Errorf("tskEY: %v", tsKeyBytes)
tsKey, err := types.TipSetKeyFromBytes(tsKeyBytes)
ts, err := t.scheduler.api.ChainGetTipSet(context.Background(), tsKey)
if err != nil {
log.Errorf("WdPostTask.Do() failed to get tipset key: %v", err)
return false, err
}
head, err := t.Scheduler.api.ChainHead(context.Background())
if err != nil {
log.Errorf("WdPostTask.Do() failed to get chain head: %v", err)
return false, err
}
if deadline.Close > head.Height() {
log.Errorf("WdPost removed stale task: %v %v", taskID, tsKey)
return true, nil
}
ts, err := t.Scheduler.api.ChainGetTipSet(context.Background(), tsKey)
if err != nil {
log.Errorf("WdPostTask.Do() failed to get tipset: %v", err)
return false, err
}
submitWdPostParams, err := t.scheduler.runPoStCycle(context.Background(), false, deadline, ts)
submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts)
if err != nil {
log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err)
return false, err
@ -81,14 +105,91 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return true, nil
}
func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID) (*harmonytask.TaskID, error) {
return &ids[0], nil
func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
// GetEpoch
ts, err := t.Scheduler.api.ChainHead(context.Background())
if err != nil {
return nil, err
}
// GetData for tasks
type wdTaskDef struct {
abi.RegisteredSealProof
Task_id harmonytask.TaskID
Tskey []byte
Open abi.ChainEpoch
Close abi.ChainEpoch
}
var tasks []wdTaskDef
err = t.db.Select(context.Background(), tasks,
`Select tskey,
task_id,
period_start,
open,
close
from wdpost_tasks
where task_id IN $1`, ids)
if err != nil {
return nil, err
}
// Accept those past deadline, then delete them in Do().
for _, task := range tasks {
if task.Close < ts.Height() {
return &task.Task_id, nil
}
}
// Discard those too big for our free RAM
freeRAM := te.ResourcesAvailable().Ram
tasks = lo.Filter(tasks, func(d wdTaskDef, _ int) bool {
return res[d.RegisteredSealProof].MaxMemory <= freeRAM
})
if len(tasks) == 0 {
log.Infof("RAM too small for any WDPost task")
return nil, nil
}
// Ignore those with too many failures unless they are the only ones left.
tasks, _ = taskhelp.SliceIfFound(tasks, func(d wdTaskDef) bool {
var r int
err := t.db.QueryRow(context.Background(), `SELECT COUNT(*)
FROM harmony_task_history
WHERE task_id = $1 AND success = false`, d.Task_id).Scan(&r)
if err != nil {
log.Errorf("WdPostTask.CanAccept() failed to queryRow: %v", err)
}
return r < 2
})
// Select the one closest to the deadline
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].Close < tasks[j].Close
})
return &tasks[0].Task_id, nil
}
var res = storiface.ResourceTable[sealtasks.TTGenerateWindowPoSt]
func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: "WdPostCompute",
Max: -1,
Name: "WdPost",
Max: t.max,
MaxFailures: 3,
Follows: nil,
Cost: resources.Resources{
Cpu: 1,
Gpu: 1,
// RAM of smallest proof's max is listed here
Ram: lo.Reduce(lo.Keys(res), func(i uint64, k abi.RegisteredSealProof, _ int) uint64 {
if res[k].MaxMemory < i {
return res[k].MaxMemory
}
return i
}, 1<<63),
},
}
}
@ -107,11 +208,12 @@ func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
}
}
func NewWdPostTask(db *harmonydb.DB, scheduler *WindowPoStScheduler) *WdPostTask {
func NewWdPostTask(db *harmonydb.DB, scheduler *WindowPoStScheduler, max int) *WdPostTask {
return &WdPostTask{
tasks: make(chan *WdPostTaskDetails, 2),
db: db,
scheduler: scheduler,
Scheduler: scheduler,
max: max,
}
}

View File

@ -2,19 +2,27 @@ package wdpost
import (
"context"
"testing"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/node/config"
"github.com/stretchr/testify/require"
"testing"
)
// test to create WDPostTask, invoke AddTask and check if the task is added to the DB
func TestAddTask(t *testing.T) {
db, err := harmonydb.New(nil, "yugabyte", "yugabyte", "yugabyte", "5433", "localhost", nil)
db, err := harmonydb.NewFromConfig(config.HarmonyDB{
Hosts: []string{"localhost"},
Port: "5433",
Username: "yugabyte",
Password: "yugabyte",
Database: "yugabyte",
})
require.NoError(t, err)
wdPostTask := NewWdPostTask(db)
wdPostTask := NewWdPostTask(db, nil, 0)
taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
ts := types.TipSet{}
deadline := dline.Info{}