diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index d2646c55d..a1d1e58a8 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -10,12 +10,12 @@ import ( "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" + "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/pkg/errors" "github.com/urfave/cli/v2" "go.opencensus.io/stats" "go.opencensus.io/tag" - "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-jsonrpc/auth" @@ -82,6 +82,11 @@ var runCmd = &cli.Command{ Usage: "path to json file containing storage config", Value: "~/.lotus/storage.json", }, + &cli.StringFlag{ + Name: "journal", + Usage: "path to journal files", + Value: "~/.lotus/", + }, }, Action: func(cctx *cli.Context) (err error) { defer func() { @@ -139,31 +144,6 @@ var runCmd = &cli.Command{ if err := r.Init(repo.Provider); err != nil { return err } - /* - lr, err := r.Lock(repo.Provider) - if err != nil { - return err - } - - var localPaths []storiface.LocalPath - - if err := lr.SetStorage(func(sc *storiface.StorageConfig) { - sc.StoragePaths = append(sc.StoragePaths, localPaths...) - }); err != nil { - return fmt.Errorf("set storage config: %w", err) - } - - { - // init datastore for r.Exists - _, err := lr.Datastore(context.Background(), "/metadata") - if err != nil { - return err - } - } - if err := lr.Close(); err != nil { - return fmt.Errorf("close repo: %w", err) - } - */ } db, err := makeDB(cctx) @@ -172,16 +152,6 @@ var runCmd = &cli.Command{ } shutdownChan := make(chan struct{}) - /* defaults break lockedRepo (below) - stop, err := node.New(ctx, - node.Override(new(dtypes.ShutdownChan), shutdownChan), - node.Provider(r), - ) - if err != nil { - return fmt.Errorf("creating node: %w", err) - } - */ - const unspecifiedAddress = "0.0.0.0" listenAddr := cctx.String("listen") addressSlice := strings.Split(listenAddr, ":") @@ -195,31 +165,15 @@ var runCmd = &cli.Command{ } } - lr, err := r.Lock(repo.Provider) - if err != nil { - return err - } - defer func() { - if err := lr.Close(); err != nil { - log.Error("closing repo", err) - } - }() - if err := lr.SetAPIToken([]byte(listenAddr)); err != nil { // our assigned listen address is our unique token - return xerrors.Errorf("setting api token: %w", err) - } - localStore, err := paths.NewLocal(ctx, &paths.BasicLocalStorage{ - PathToJSON: cctx.String("storage-json"), - }, nil, []string{"http://" + listenAddr + "/remote"}) - if err != nil { - return err - } + /////////////////////////////////////////////////////////////////////// + ///// Dependency Setup + /////////////////////////////////////////////////////////////////////// + + // The config feeds into task runners & their helpers cfg, err := getConfig(cctx, db) if err != nil { return err } - // The config feeds into task runners & their helpers - - var activeTasks []harmonytask.TaskInterface var verif storiface.Verifier = ffiwrapper.ProofVerifier @@ -232,20 +186,12 @@ var runCmd = &cli.Command{ if err != nil { return err } - j, err := fsjournal.OpenFSJournal(lr, de) + j, err := fsjournal.OpenFSJournal2(cctx.String("journal"), de) if err != nil { return err } defer j.Close() - al := alerting.NewAlertingSystem(j) - si := paths.NewIndexProxy(al, db, true) - - lstor, err := paths.NewLocal(ctx, lr, si, nil /*TODO URLs*/) - if err != nil { - return err - } - full, fullCloser, err := cliutil.GetFullNodeAPIV1LotusProvider(cctx, cfg.Apis.FULLNODE_API_INFO) if err != nil { return err @@ -257,24 +203,25 @@ var runCmd = &cli.Command{ return err } - stor := paths.NewRemote(lstor, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{}) - mds, err := lr.Datastore(ctx, "/metadata") // TODO rm datastore after sector-info moves to DB + al := alerting.NewAlertingSystem(j) + si := paths.NewIndexProxy(al, db, true) + bls := &paths.BasicLocalStorage{ + PathToJSON: cctx.String("storage-json"), + } + localStore, err := paths.NewLocal(ctx, bls, si, []string{"http://" + listenAddr + "/remote"}) if err != nil { return err } - wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix)) - smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix)) - sealer, err := sealer.New(ctx, lstor, stor, lr, si, cfg.SealerConfig, config.ProvingConfig{}, wsts, smsts) - if err != nil { - return err - } + stor := paths.NewRemote(localStore, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{}) - //ds, dsCloser, err := modules.DatastoreV2(ctx, false, lr) + unusedDataStore := datastore.NewMapDatastore() + wsts := statestore.New(namespace.Wrap(unusedDataStore, modules.WorkerCallsPrefix)) + smsts := statestore.New(namespace.Wrap(unusedDataStore, modules.ManagerWorkPrefix)) + sealer, err := sealer.New(ctx, localStore, stor, bls, si, cfg.SealerConfig, config.ProvingConfig{}, wsts, smsts) if err != nil { return err } - //defer dsCloser() var maddrs []dtypes.MinerAddress for _, s := range cfg.Addresses.MinerAddresses { @@ -285,13 +232,20 @@ var runCmd = &cli.Command{ maddrs = append(maddrs, dtypes.MinerAddress(addr)) } - if cfg.Subsystems.EnableWindowPost { - wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, sealer, verif, j, - as, maddrs, db, cfg.Subsystems.WindowPostMaxTasks) - if err != nil { - return err + /////////////////////////////////////////////////////////////////////// + ///// Task Selection + /////////////////////////////////////////////////////////////////////// + var activeTasks []harmonytask.TaskInterface + { + + if cfg.Subsystems.EnableWindowPost { + wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, sealer, verif, j, + as, maddrs, db, cfg.Subsystems.WindowPostMaxTasks) + if err != nil { + return err + } + activeTasks = append(activeTasks, wdPostTask) } - activeTasks = append(activeTasks, wdPostTask) } taskEngine, err := harmonytask.New(db, activeTasks, listenAddr) if err != nil { @@ -333,6 +287,7 @@ var runCmd = &cli.Command{ */ // Monitor for shutdown. + // TODO provide a graceful shutdown API on shutdownChan finishCh := node.MonitorShutdown(shutdownChan) //node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, //node.ShutdownHandler{Component: "provider", StopFunc: stop}, diff --git a/go.mod b/go.mod index 4c654c16e..0d11cfd06 100644 --- a/go.mod +++ b/go.mod @@ -260,6 +260,7 @@ require ( github.com/ipfs/go-verifcid v0.0.2 // indirect github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 // indirect github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 // indirect + github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle/v2 v2.2.0 // indirect diff --git a/go.sum b/go.sum index 9beece7f2..9fff24eed 100644 --- a/go.sum +++ b/go.sum @@ -888,6 +888,8 @@ github.com/ipni/index-provider v0.12.0 h1:R3F6dxxKNv4XkE4GJZNLOG0bDEbBQ/S5iztXwS github.com/ipni/index-provider v0.12.0/go.mod h1:GhyrADJp7n06fqoc1djzkvL4buZYHzV8SoWrlxEo5F4= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= +github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6hFJUX53drDT4UsSW3DEhKn0ifuHw= +github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= diff --git a/journal/fsjournal/fs.go b/journal/fsjournal/fs.go index 71aaa95a5..e3e2acb80 100644 --- a/journal/fsjournal/fs.go +++ b/journal/fsjournal/fs.go @@ -37,7 +37,11 @@ type fsJournal struct { // OpenFSJournal constructs a rolling filesystem journal, with a default // per-file size limit of 1GiB. func OpenFSJournal(lr repo.LockedRepo, disabled journal.DisabledEvents) (journal.Journal, error) { - dir := filepath.Join(lr.Path(), "journal") + return OpenFSJournal2(lr.Path(), disabled) +} + +func OpenFSJournal2(path string, disabled journal.DisabledEvents) (journal.Journal, error) { + dir := filepath.Join(path, "journal") if err := os.MkdirAll(dir, 0755); err != nil { return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err) } diff --git a/lib/harmony/harmonydb/userfuncs.go b/lib/harmony/harmonydb/userfuncs.go index c4620d449..ffab5903c 100644 --- a/lib/harmony/harmonydb/userfuncs.go +++ b/lib/harmony/harmonydb/userfuncs.go @@ -2,9 +2,12 @@ package harmonydb import ( "context" + "errors" "github.com/georgysavva/scany/v2/pgxscan" + "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" ) // rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries. @@ -146,3 +149,8 @@ func (t *Tx) QueryRow(sql rawStringOnly, arguments ...any) Row { func (t *Tx) Select(sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error { return pgxscan.Select(t.ctx, t.Tx, sliceOfStructPtr, string(sql), arguments...) } + +func IsErrUniqueContraint(err error) bool { + var e2 *pgconn.PgError + return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation +} diff --git a/storage/wdpost/wdpost_task.go b/storage/wdpost/wdpost_task.go new file mode 100644 index 000000000..69336a706 --- /dev/null +++ b/storage/wdpost/wdpost_task.go @@ -0,0 +1,351 @@ +package wdpost + +import ( + "bytes" + "context" + "sort" + "time" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/harmony/taskhelp" + "github.com/filecoin-project/lotus/storage/sealer/sealtasks" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/samber/lo" + cbg "github.com/whyrusleeping/cbor-gen" +) + +type WdPostTaskDetails struct { + Ts *types.TipSet + Deadline *dline.Info +} + +type WdPostTask struct { + tasks chan *WdPostTaskDetails + db *harmonydb.DB + Scheduler *WindowPoStScheduler + max int +} + +func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + + time.Sleep(5 * time.Second) + log.Errorf("WdPostTask.Do() called with taskID: %v", taskID) + + var tsKeyBytes []byte + var deadline dline.Info + + err = t.db.QueryRow(context.Background(), + `Select tskey, + current_epoch, + period_start, + index, + open, + close, + challenge, + fault_cutoff, + wpost_period_deadlines, + wpost_proving_period, + wpost_challenge_window, + wpost_challenge_lookback, + fault_declaration_cutoff + from wdpost_tasks + where task_id = $1`, taskID).Scan( + &tsKeyBytes, + &deadline.CurrentEpoch, + &deadline.PeriodStart, + &deadline.Index, + &deadline.Open, + &deadline.Close, + &deadline.Challenge, + &deadline.FaultCutoff, + &deadline.WPoStPeriodDeadlines, + &deadline.WPoStProvingPeriod, + &deadline.WPoStChallengeWindow, + &deadline.WPoStChallengeLookback, + &deadline.FaultDeclarationCutoff, + ) + if err != nil { + log.Errorf("WdPostTask.Do() failed to queryRow: %v", err) + return false, err + } + + log.Errorf("tskEY: %v", tsKeyBytes) + tsKey, err := types.TipSetKeyFromBytes(tsKeyBytes) + if err != nil { + log.Errorf("WdPostTask.Do() failed to get tipset key: %v", err) + return false, err + } + head, err := t.Scheduler.api.ChainHead(context.Background()) + if err != nil { + log.Errorf("WdPostTask.Do() failed to get chain head: %v", err) + return false, err + } + + if deadline.Close > head.Height() { + log.Errorf("WdPost removed stale task: %v %v", taskID, tsKey) + return true, nil + } + + ts, err := t.Scheduler.api.ChainGetTipSet(context.Background(), tsKey) + if err != nil { + log.Errorf("WdPostTask.Do() failed to get tipset: %v", err) + return false, err + } + submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts) + if err != nil { + log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err) + return false, err + } + + log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams) + + // Enter an entry for each wdpost message proof into the wdpost_proofs table + for _, params := range submitWdPostParams { + + // Convert submitWdPostParams.Partitions to a byte array using CBOR + buf := new(bytes.Buffer) + scratch := make([]byte, 9) + if err := cbg.WriteMajorTypeHeaderBuf(scratch, buf, cbg.MajArray, uint64(len(params.Partitions))); err != nil { + return false, err + } + for _, v := range params.Partitions { + if err := v.MarshalCBOR(buf); err != nil { + return false, err + } + } + + // Insert into wdpost_proofs table + _, err = t.db.Exec(context.Background(), + `INSERT INTO wdpost_proofs ( + deadline, + partitions, + proof_type, + proof_bytes) + VALUES ($1, $2, $3, $4)`, + params.Deadline, + buf.Bytes(), + params.Proofs[0].PoStProof, + params.Proofs[0].ProofBytes) + } + + return true, nil +} + +func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + // GetEpoch + ts, err := t.Scheduler.api.ChainHead(context.Background()) + + if err != nil { + return nil, err + } + + // GetData for tasks + type wdTaskDef struct { + abi.RegisteredSealProof + Task_id harmonytask.TaskID + Tskey []byte + Open abi.ChainEpoch + Close abi.ChainEpoch + } + var tasks []wdTaskDef + err = t.db.Select(context.Background(), &tasks, + `Select tskey, + task_id, + period_start, + open, + close + from wdpost_tasks + where task_id IN $1`, ids) + if err != nil { + return nil, err + } + + // Accept those past deadline, then delete them in Do(). + for _, task := range tasks { + if task.Close < ts.Height() { + return &task.Task_id, nil + } + } + + // Discard those too big for our free RAM + freeRAM := te.ResourcesAvailable().Ram + tasks = lo.Filter(tasks, func(d wdTaskDef, _ int) bool { + return res[d.RegisteredSealProof].MaxMemory <= freeRAM + }) + if len(tasks) == 0 { + log.Infof("RAM too small for any WDPost task") + return nil, nil + } + + // Ignore those with too many failures unless they are the only ones left. + tasks, _ = taskhelp.SliceIfFound(tasks, func(d wdTaskDef) bool { + var r int + err := t.db.QueryRow(context.Background(), `SELECT COUNT(*) + FROM harmony_task_history + WHERE task_id = $1 AND success = false`, d.Task_id).Scan(&r) + if err != nil { + log.Errorf("WdPostTask.CanAccept() failed to queryRow: %v", err) + } + return r < 2 + }) + + // Select the one closest to the deadline + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].Close < tasks[j].Close + }) + + return &tasks[0].Task_id, nil +} + +var res = storiface.ResourceTable[sealtasks.TTGenerateWindowPoSt] + +func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Name: "WdPost", + Max: t.max, + MaxFailures: 3, + Follows: nil, + Cost: resources.Resources{ + Cpu: 1, + Gpu: 1, + // RAM of smallest proof's max is listed here + Ram: lo.Reduce(lo.Keys(res), func(i uint64, k abi.RegisteredSealProof, _ int) uint64 { + if res[k].MaxMemory < i { + return res[k].MaxMemory + } + return i + }, 1<<63), + }, + } +} + +func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) { + + // wait for any channels on t.tasks and call taskFunc on them + for taskDetails := range t.tasks { + + //log.Errorf("WdPostTask.Adder() received taskDetails: %v", taskDetails) + + taskFunc(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { + return t.addTaskToDB(taskDetails.Ts, taskDetails.Deadline, tID, tx) + }) + } +} + +func NewWdPostTask(db *harmonydb.DB, scheduler *WindowPoStScheduler, max int) *WdPostTask { + return &WdPostTask{ + tasks: make(chan *WdPostTaskDetails, 2), + db: db, + Scheduler: scheduler, + max: max, + } +} + +func (t *WdPostTask) AddTask(ctx context.Context, ts *types.TipSet, deadline *dline.Info) error { + + t.tasks <- &WdPostTaskDetails{ + Ts: ts, + Deadline: deadline, + } + + //log.Errorf("WdPostTask.AddTask() called with ts: %v, deadline: %v, taskList: %v", ts, deadline, t.tasks) + + return nil +} + +func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) { + + tsKey := ts.Key() + + //log.Errorf("WdPostTask.addTaskToDB() called with tsKey: %v, taskId: %v", tsKey, taskId) + + _, err := tx.Exec( + `INSERT INTO wdpost_tasks ( + task_id, + tskey, + current_epoch, + period_start, + index, + open, + close, + challenge, + fault_cutoff, + wpost_period_deadlines, + wpost_proving_period, + wpost_challenge_window, + wpost_challenge_lookback, + fault_declaration_cutoff + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`, + taskId, + tsKey.Bytes(), + deadline.CurrentEpoch, + deadline.PeriodStart, + deadline.Index, + deadline.Open, + deadline.Close, + deadline.Challenge, + deadline.FaultCutoff, + deadline.WPoStPeriodDeadlines, + deadline.WPoStProvingPeriod, + deadline.WPoStChallengeWindow, + deadline.WPoStChallengeLookback, + deadline.FaultDeclarationCutoff, + ) + if err != nil { + if harmonydb.IsErrUniqueContraint(err) { + return false, nil // already in DB, not worth logging as an error. + } + return false, err + } + + return true, nil +} + +func (t *WdPostTask) AddTaskOld(ctx context.Context, ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID) error { + + tsKey := ts.Key() + _, err := t.db.Exec(ctx, + `INSERT INTO wdpost_tasks ( + task_id, + tskey, + current_epoch, + period_start, + index, + open, + close, + challenge, + fault_cutoff, + wpost_period_deadlines, + wpost_proving_period, + wpost_challenge_window, + wpost_challenge_lookback, + fault_declaration_cutoff + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`, + taskId, + tsKey.Bytes(), + deadline.CurrentEpoch, + deadline.PeriodStart, + deadline.Index, + deadline.Open, + deadline.Close, + deadline.Challenge, + deadline.FaultCutoff, + deadline.WPoStPeriodDeadlines, + deadline.WPoStProvingPeriod, + deadline.WPoStChallengeWindow, + deadline.WPoStChallengeLookback, + deadline.FaultDeclarationCutoff, + ) + if err != nil { + return err + } + + return nil +} + +var _ harmonytask.TaskInterface = &WdPostTask{} diff --git a/~/.lotus/journal/lotus-journal.ndjson b/~/.lotus/journal/lotus-journal.ndjson new file mode 100644 index 000000000..e69de29bb