diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index 45fbfcd95..d90eebaab 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/filecoin-project/lotus/storage/wdpost" "net" "os" "strings" @@ -210,7 +211,9 @@ var runCmd = &cli.Command{ return err } - taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{}, address) + wdPostTask := wdpost.NewWdPostTask(db) + + taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, address) if err != nil { return err } diff --git a/lib/harmony/harmonydb/sql/20230719.sql b/lib/harmony/harmonydb/sql/20230719.sql index 0a676526b..d4eb58326 100644 --- a/lib/harmony/harmonydb/sql/20230719.sql +++ b/lib/harmony/harmonydb/sql/20230719.sql @@ -18,7 +18,7 @@ CREATE TABLE harmony_task ( owner_id INTEGER REFERENCES harmony_machines (id) ON DELETE SET NULL, added_by INTEGER NOT NULL, previous_task INTEGER, - name varchar(8) NOT NULL + name varchar(16) NOT NULL ); COMMENT ON COLUMN harmony_task.initiated_by IS 'The task ID whose completion occasioned this task.'; COMMENT ON COLUMN harmony_task.owner_id IS 'The foreign key to harmony_machines.'; @@ -29,7 +29,7 @@ COMMENT ON COLUMN harmony_task.update_time IS 'When it was last modified. not a CREATE TABLE harmony_task_history ( id SERIAL PRIMARY KEY NOT NULL, task_id INTEGER NOT NULL, - name VARCHAR(8) NOT NULL, + name VARCHAR(16) NOT NULL, posted TIMESTAMP NOT NULL, work_start TIMESTAMP NOT NULL, work_end TIMESTAMP NOT NULL, @@ -41,12 +41,12 @@ COMMENT ON COLUMN harmony_task_history.result IS 'Use to detemine if this was a CREATE TABLE harmony_task_follow ( id SERIAL PRIMARY KEY NOT NULL, owner_id INTEGER NOT NULL REFERENCES harmony_machines (id) ON DELETE CASCADE, - to_type VARCHAR(8) NOT NULL, - from_type VARCHAR(8) NOT NULL + to_type VARCHAR(16) NOT NULL, + from_type VARCHAR(16) NOT NULL ); CREATE TABLE harmony_task_impl ( id SERIAL PRIMARY KEY NOT NULL, owner_id INTEGER NOT NULL REFERENCES harmony_machines (id) ON DELETE CASCADE, - name VARCHAR(8) NOT NULL + name VARCHAR(16) NOT NULL ); \ No newline at end of file diff --git a/lib/harmony/resources/resources.go b/lib/harmony/resources/resources.go index 1ac4709d7..12992936e 100644 --- a/lib/harmony/resources/resources.go +++ b/lib/harmony/resources/resources.go @@ -54,7 +54,7 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) { if err != nil { return nil, fmt.Errorf("could not read from harmony_machines: %w", err) } - gpuram := lo.Sum(reg.GpuRam) + gpuram := uint64(lo.Sum(reg.GpuRam)) if len(ownerID) == 0 { err = db.QueryRow(ctx, `INSERT INTO harmony_machines (host_and_port, cpu, ram, gpu, gpuram) VALUES diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 9ac22bde0..c1a71bb08 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -316,23 +316,23 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func ctx := helpers.LifecycleCtx(mctx, lc) - fps, err := wdpost.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, maddr, db) - - if err != nil { - return nil, err - } - //wdPostTask := wdpost.NewWdPostTask(db) - // + //taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300") //if err != nil { // return nil, xerrors.Errorf("failed to create task engine: %w", err) //} - //handler := gin.New() - // - //taskEngine.ApplyHttpHandlers(handler.Group("/")) + ////handler := gin.New() + //// + ////taskEngine.ApplyHttpHandlers(handler.Group("/")) //defer taskEngine.GracefullyTerminate(time.Hour) + fps, err := wdpost.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, maddr, db, nil) + + if err != nil { + return nil, err + } + lc.Append(fx.Hook{ OnStart: func(context.Context) error { go fps.Run(ctx) diff --git a/storage/wdpost/wdpost_changehandler_2.go b/storage/wdpost/wdpost_changehandler_2.go index 0c20eea68..cb1facdcf 100644 --- a/storage/wdpost/wdpost_changehandler_2.go +++ b/storage/wdpost/wdpost_changehandler_2.go @@ -44,10 +44,10 @@ func (ch *changeHandler2) currentTSDI() (*types.TipSet, *dline.Info) { panic("implement me") } -func newChangeHandler2(api wdPoStCommands, actor address.Address, db *harmonydb.DB) *changeHandler2 { +func newChangeHandler2(api wdPoStCommands, actor address.Address, task *WdPostTask) *changeHandler2 { log.Errorf("newChangeHandler2() called with api: %v, actor: %v", api, actor) //posts := newPostsCache() - p := newProver2(api, db) + p := newProver2(api, task) //s := newSubmitter(api, posts) return &changeHandler2{api: api, actor: actor, proveHdlr: p} } @@ -58,7 +58,6 @@ func (ch *changeHandler2) start() { } func (ch *changeHandler2) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error { - log.Errorf("changeHandler2.update() called with revert: %v, advance: %v", revert, advance) // Get the current deadline period di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key()) if err != nil { @@ -177,7 +176,8 @@ type proveHandler2 struct { func newProver2( api wdPoStCommands, //posts *postsCache, - db *harmonydb.DB, + //db *harmonydb.DB, + wdPostTask *WdPostTask, ) *proveHandler2 { ctx, cancel := context.WithCancel(context.Background()) return &proveHandler2{ @@ -187,7 +187,7 @@ func newProver2( hcs: make(chan *headChange), shutdownCtx: ctx, shutdown: cancel, - wdPostTask: NewWdPostTask(db), + wdPostTask: wdPostTask, } } @@ -205,7 +205,6 @@ func (p *proveHandler2) run() { return case hc := <-p.hcs: - log.Errorf("--------------------WINDOW POST PROVE HANDLER RECEIVE CHAN----------------------") // Head changed p.processHeadChange(hc.ctx, hc.advance, hc.di) if p.processedHeadChanges != nil { @@ -224,8 +223,6 @@ func (p *proveHandler2) run() { func (p *proveHandler2) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) { // If the post window has expired, abort the current proof - log.Errorf("--------------------WINDOW POST PROVE HANDLER PROCESS HC----------------------") - if p.current != nil && newTS.Height() >= p.current.di.Close { // Cancel the context on the current proof p.current.abort() diff --git a/storage/wdpost/wdpost_sched.go b/storage/wdpost/wdpost_sched.go index cf04cf5af..36829e107 100644 --- a/storage/wdpost/wdpost_sched.go +++ b/storage/wdpost/wdpost_sched.go @@ -3,6 +3,8 @@ package wdpost import ( "context" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/gin-gonic/gin" "time" "github.com/ipfs/go-cid" @@ -87,7 +89,8 @@ type WindowPoStScheduler struct { // failed abi.ChainEpoch // eps // failLk sync.Mutex - db *harmonydb.DB + db *harmonydb.DB + wdPostTask *WdPostTask } // NewWindowedPoStScheduler creates a new WindowPoStScheduler scheduler. @@ -100,7 +103,8 @@ func NewWindowedPoStScheduler(api NodeAPI, ft sealer.FaultTracker, j journal.Journal, actor address.Address, - db *harmonydb.DB) (*WindowPoStScheduler, error) { + db *harmonydb.DB, + task *WdPostTask) (*WindowPoStScheduler, error) { mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK) if err != nil { return nil, xerrors.Errorf("getting sector size: %w", err) @@ -126,14 +130,27 @@ func NewWindowedPoStScheduler(api NodeAPI, evtTypeWdPoStRecoveries: j.RegisterEventType("wdpost", "recoveries_processed"), evtTypeWdPoStFaults: j.RegisterEventType("wdpost", "faults_processed"), }, - journal: j, - db: db, + journal: j, + wdPostTask: task, + db: db, }, nil } func (s *WindowPoStScheduler) Run(ctx context.Context) { // Initialize change handler. + wdPostTask := NewWdPostTask(s.db) + + taskEngine, er := harmonytask.New(s.db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300") + if er != nil { + //return nil, xerrors.Errorf("failed to create task engine: %w", err) + log.Errorf("failed to create task engine: %w", er) + } + handler := gin.New() + + taskEngine.ApplyHttpHandlers(handler.Group("/")) + defer taskEngine.GracefullyTerminate(time.Hour) + // callbacks is a union of the fullNodeFilteredAPI and ourselves. callbacks := struct { NodeAPI @@ -147,7 +164,7 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) { defer s.ch.shutdown() s.ch.start() } else { - s.ch = newChangeHandler2(callbacks, s.actor, s.db) + s.ch = newChangeHandler2(callbacks, s.actor, wdPostTask) defer s.ch.shutdown() s.ch.start() } @@ -227,7 +244,6 @@ func (s *WindowPoStScheduler) Run(ctx context.Context) { } func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.TipSet) { - log.Errorf("WindowPoStScheduler.update() called with revert: %v, apply: %v", revert, apply) if apply == nil { log.Error("no new tipset in window post WindowPoStScheduler.update") return diff --git a/storage/wdpost/wdpost_task.go b/storage/wdpost/wdpost_task.go index b8bdb842b..8516bd766 100644 --- a/storage/wdpost/wdpost_task.go +++ b/storage/wdpost/wdpost_task.go @@ -30,15 +30,16 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID) (*harmonytask.TaskID, e func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ - Name: "WdPostGeneration", + Name: "WdPostCompute", } } func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) { + log.Errorf("WdPostTask.Adder() called ----------------------------- ") + // wait for any channels on t.tasks and call taskFunc on them for taskDetails := range t.tasks { - //taskDetails := <-ch log.Errorf("WdPostTask.Adder() received taskDetails: %v", taskDetails) @@ -57,8 +58,6 @@ func NewWdPostTask(db *harmonydb.DB) *WdPostTask { func (t *WdPostTask) AddTask(ctx context.Context, ts *types.TipSet, deadline *dline.Info) error { - //ch := make(chan *WdPostTaskDetails) - //t.tasks = append(t.tasks, ch) t.tasks <- &WdPostTaskDetails{ Ts: ts, Deadline: deadline, @@ -70,7 +69,11 @@ func (t *WdPostTask) AddTask(ctx context.Context, ts *types.TipSet, deadline *dl } func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { + tsKey := ts.Key() + + log.Errorf("WdPostTask.addTaskToDB() called with tsKey: %v, taskId: %v", tsKey, taskId) + _, err := tx.Exec( `INSERT INTO wdpost_tasks ( task_id, diff --git a/storage/wdpost/wdpost_task_test.go b/storage/wdpost/wdpost_task_test.go new file mode 100644 index 000000000..476b49b88 --- /dev/null +++ b/storage/wdpost/wdpost_task_test.go @@ -0,0 +1,24 @@ +package wdpost + +import ( + "context" + "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/stretchr/testify/require" + "testing" +) + +// test to create WDPostTask, invoke AddTask and check if the task is added to the DB +func TestAddTask(t *testing.T) { + db, err := harmonydb.New(nil, "yugabyte", "yugabyte", "yugabyte", "5433", "localhost", nil) + require.NoError(t, err) + wdPostTask := NewWdPostTask(db) + taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300") + ts := types.TipSet{} + deadline := dline.Info{} + err := wdPostTask.AddTask(context.Background(), &ts, &deadline) + + require.NoError(t, err) +}