Merge pull request #11358 from filecoin-project/simpleharmony

Simpleharmony
This commit is contained in:
Łukasz Magiera 2023-10-30 12:13:47 +01:00 committed by GitHub
commit 326d83695b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 106 additions and 312 deletions

View File

@ -14,7 +14,6 @@ import (
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/tag" "go.opencensus.io/tag"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-jsonrpc/auth"
@ -23,6 +22,7 @@ import (
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
cliutil "github.com/filecoin-project/lotus/cli/util" cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/journal/fsjournal" "github.com/filecoin-project/lotus/journal/fsjournal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/harmonytask"
@ -78,6 +78,11 @@ var runCmd = &cli.Command{
Usage: "path to json file containing storage config", Usage: "path to json file containing storage config",
Value: "~/.lotus/storage.json", Value: "~/.lotus/storage.json",
}, },
&cli.StringFlag{
Name: "journal",
Usage: "path to journal files",
Value: "~/.lotus/",
},
}, },
Action: func(cctx *cli.Context) (err error) { Action: func(cctx *cli.Context) (err error) {
defer func() { defer func() {
@ -135,31 +140,6 @@ var runCmd = &cli.Command{
if err := r.Init(repo.Provider); err != nil { if err := r.Init(repo.Provider); err != nil {
return err return err
} }
/*
lr, err := r.Lock(repo.Provider)
if err != nil {
return err
}
var localPaths []storiface.LocalPath
if err := lr.SetStorage(func(sc *storiface.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
}); err != nil {
return fmt.Errorf("set storage config: %w", err)
}
{
// init datastore for r.Exists
_, err := lr.Datastore(context.Background(), "/metadata")
if err != nil {
return err
}
}
if err := lr.Close(); err != nil {
return fmt.Errorf("close repo: %w", err)
}
*/
} }
db, err := makeDB(cctx) db, err := makeDB(cctx)
@ -168,16 +148,6 @@ var runCmd = &cli.Command{
} }
shutdownChan := make(chan struct{}) shutdownChan := make(chan struct{})
/* defaults break lockedRepo (below)
stop, err := node.New(ctx,
node.Override(new(dtypes.ShutdownChan), shutdownChan),
node.Provider(r),
)
if err != nil {
return fmt.Errorf("creating node: %w", err)
}
*/
const unspecifiedAddress = "0.0.0.0" const unspecifiedAddress = "0.0.0.0"
listenAddr := cctx.String("listen") listenAddr := cctx.String("listen")
addressSlice := strings.Split(listenAddr, ":") addressSlice := strings.Split(listenAddr, ":")
@ -191,31 +161,15 @@ var runCmd = &cli.Command{
} }
} }
lr, err := r.Lock(repo.Provider) ///////////////////////////////////////////////////////////////////////
if err != nil { ///// Dependency Setup
return err ///////////////////////////////////////////////////////////////////////
}
defer func() { // The config feeds into task runners & their helpers
if err := lr.Close(); err != nil {
log.Error("closing repo", err)
}
}()
if err := lr.SetAPIToken([]byte(listenAddr)); err != nil { // our assigned listen address is our unique token
return xerrors.Errorf("setting api token: %w", err)
}
localStore, err := paths.NewLocal(ctx, &paths.BasicLocalStorage{
PathToJSON: cctx.String("storage-json"),
}, nil, []string{"http://" + listenAddr + "/remote"})
if err != nil {
return err
}
cfg, err := getConfig(cctx, db) cfg, err := getConfig(cctx, db)
if err != nil { if err != nil {
return err return err
} }
// The config feeds into task runners & their helpers
var activeTasks []harmonytask.TaskInterface
var verif storiface.Verifier = ffiwrapper.ProofVerifier var verif storiface.Verifier = ffiwrapper.ProofVerifier
@ -228,19 +182,13 @@ var runCmd = &cli.Command{
if err != nil { if err != nil {
return err return err
} }
j, err := fsjournal.OpenFSJournal(lr, de) j, err := fsjournal.OpenFSJournalPath(cctx.String("journal"), de)
if err != nil { if err != nil {
return err return err
} }
defer j.Close() defer j.Close()
si := paths.NewIndexProxy( /*TODO Alerting*/ nil, db, true) full, fullCloser, err := cliutil.GetFullNodeAPIV1LotusProvider(cctx, cfg.Apis.FULLNODE_API_INFO)
lstor, err := paths.NewLocal(ctx, lr, si, nil /*TODO URLs*/)
if err != nil {
return err
}
full, fullCloser, err := cliutil.GetFullNodeAPIV1LotusProvider(cctx, cfg.Apis.FULLNODE_API_INFO) // TODO switch this into DB entries.
if err != nil { if err != nil {
return err return err
} }
@ -251,8 +199,18 @@ var runCmd = &cli.Command{
return err return err
} }
al := alerting.NewAlertingSystem(j)
si := paths.NewIndexProxy(al, db, true)
bls := &paths.BasicLocalStorage{
PathToJSON: cctx.String("storage-json"),
}
localStore, err := paths.NewLocal(ctx, bls, si, []string{"http://" + listenAddr + "/remote"})
if err != nil {
return err
}
// todo fetch limit config // todo fetch limit config
stor := paths.NewRemote(lstor, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{}) stor := paths.NewRemote(localStore, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
// todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper // todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper
// maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably // maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably
@ -268,14 +226,21 @@ var runCmd = &cli.Command{
maddrs = append(maddrs, dtypes.MinerAddress(addr)) maddrs = append(maddrs, dtypes.MinerAddress(addr))
} }
///////////////////////////////////////////////////////////////////////
///// Task Selection
///////////////////////////////////////////////////////////////////////
var activeTasks []harmonytask.TaskInterface
{
if cfg.Subsystems.EnableWindowPost { if cfg.Subsystems.EnableWindowPost {
wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
as, maddrs, db, stor, si) as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
if err != nil { if err != nil {
return err return err
} }
activeTasks = append(activeTasks, wdPostTask) activeTasks = append(activeTasks, wdPostTask)
} }
}
taskEngine, err := harmonytask.New(db, activeTasks, listenAddr) taskEngine, err := harmonytask.New(db, activeTasks, listenAddr)
if err != nil { if err != nil {
return err return err
@ -283,7 +248,6 @@ var runCmd = &cli.Command{
handler := gin.New() handler := gin.New()
taskEngine.ApplyHttpHandlers(handler.Group("/"))
defer taskEngine.GracefullyTerminate(time.Hour) defer taskEngine.GracefullyTerminate(time.Hour)
fh := &paths.FetchHandler{Local: localStore, PfHandler: &paths.DefaultPartialFileHandler{}} fh := &paths.FetchHandler{Local: localStore, PfHandler: &paths.DefaultPartialFileHandler{}}
@ -316,6 +280,7 @@ var runCmd = &cli.Command{
*/ */
// Monitor for shutdown. // Monitor for shutdown.
// TODO provide a graceful shutdown API on shutdownChan
finishCh := node.MonitorShutdown(shutdownChan) //node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, finishCh := node.MonitorShutdown(shutdownChan) //node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper},
//node.ShutdownHandler{Component: "provider", StopFunc: stop}, //node.ShutdownHandler{Component: "provider", StopFunc: stop},

1
go.mod
View File

@ -261,6 +261,7 @@ require (
github.com/ipfs/go-verifcid v0.0.2 // indirect github.com/ipfs/go-verifcid v0.0.2 // indirect
github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 // indirect github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 // indirect
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 // indirect github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 // indirect
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.0 // indirect github.com/jackc/puddle/v2 v2.2.0 // indirect

2
go.sum
View File

@ -888,6 +888,8 @@ github.com/ipni/index-provider v0.12.0 h1:R3F6dxxKNv4XkE4GJZNLOG0bDEbBQ/S5iztXwS
github.com/ipni/index-provider v0.12.0/go.mod h1:GhyrADJp7n06fqoc1djzkvL4buZYHzV8SoWrlxEo5F4= github.com/ipni/index-provider v0.12.0/go.mod h1:GhyrADJp7n06fqoc1djzkvL4buZYHzV8SoWrlxEo5F4=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa h1:s+4MhCQ6YrzisK6hFJUX53drDT4UsSW3DEhKn0ifuHw=
github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk=

View File

@ -79,13 +79,14 @@ func TestHarmonyTasks(t *testing.T) {
toAdd: []int{56, 73}, toAdd: []int{56, 73},
myPersonalTable: map[harmonytask.TaskID]int{}, myPersonalTable: map[harmonytask.TaskID]int{},
} }
harmonytask.POLL_DURATION = time.Millisecond * 100
e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1") e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1")
require.NoError(t, err) require.NoError(t, err)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE. time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
e.GracefullyTerminate(time.Minute) e.GracefullyTerminate(time.Minute)
expected := []string{"taskResult56", "taskResult73"} expected := []string{"taskResult56", "taskResult73"}
sort.Strings(t1.WorkCompleted) sort.Strings(t1.WorkCompleted)
require.Equal(t, t1.WorkCompleted, expected, "unexpected results") require.Equal(t, expected, t1.WorkCompleted, "unexpected results")
}) })
} }
@ -253,11 +254,3 @@ func TestTaskRetry(t *testing.T) {
{2, false, "error: intentional 'error'"}}, res) {2, false, "error: intentional 'error'"}}, res)
}) })
} }
/*
FUTURE test fast-pass round-robin via http calls (3party) once the API for that is set
It's necessary for WinningPoSt.
FUTURE test follows.
It's necessary for sealing work.
*/

View File

@ -37,7 +37,11 @@ type fsJournal struct {
// OpenFSJournal constructs a rolling filesystem journal, with a default // OpenFSJournal constructs a rolling filesystem journal, with a default
// per-file size limit of 1GiB. // per-file size limit of 1GiB.
func OpenFSJournal(lr repo.LockedRepo, disabled journal.DisabledEvents) (journal.Journal, error) { func OpenFSJournal(lr repo.LockedRepo, disabled journal.DisabledEvents) (journal.Journal, error) {
dir := filepath.Join(lr.Path(), "journal") return OpenFSJournalPath(lr.Path(), disabled)
}
func OpenFSJournalPath(path string, disabled journal.DisabledEvents) (journal.Journal, error) {
dir := filepath.Join(path, "journal")
if err := os.MkdirAll(dir, 0755); err != nil { if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err) return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err)
} }

View File

@ -2,9 +2,12 @@ package harmonydb
import ( import (
"context" "context"
"errors"
"github.com/georgysavva/scany/v2/pgxscan" "github.com/georgysavva/scany/v2/pgxscan"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
) )
// rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries. // rawStringOnly is _intentionally_private_ to force only basic strings in SQL queries.
@ -146,3 +149,8 @@ func (t *Tx) QueryRow(sql rawStringOnly, arguments ...any) Row {
func (t *Tx) Select(sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error { func (t *Tx) Select(sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error {
return pgxscan.Select(t.ctx, t.Tx, sliceOfStructPtr, string(sql), arguments...) return pgxscan.Select(t.ctx, t.Tx, sliceOfStructPtr, string(sql), arguments...)
} }
func IsErrUniqueContraint(err error) bool {
var e2 *pgconn.PgError
return errors.As(err, &e2) && e2.Code == pgerrcode.UniqueViolation
}

View File

@ -15,9 +15,8 @@ Mental Model:
- max was specified and reached - max was specified and reached
- resource exhaustion - resource exhaustion
- CanAccept() interface (per-task implmentation) does not accept it. - CanAccept() interface (per-task implmentation) does not accept it.
Ways tasks start: (slowest first) Ways tasks start:
- DB Read every 1 minute - DB Read every 3 seconds
- Bump via HTTP if registered in DB
- Task was added (to db) by this process - Task was added (to db) by this process
Ways tasks get added: Ways tasks get added:
- Async Listener task (for chain, etc) - Async Listener task (for chain, etc)
@ -68,12 +67,5 @@ harmony_task_machines
anything, but serves as a discovery mechanism. Paths are hostnames + ports anything, but serves as a discovery mechanism. Paths are hostnames + ports
which are presumed to support http, but this assumption is only used by which are presumed to support http, but this assumption is only used by
the task system. the task system.
harmony_task_follow / harmony_task_impl
These tables are used to fast-path notifications to other machines instead
of waiting for polling. _impl helps round-robin work pick-up. _follow helps
discover the machines that are interested in creating tasks following the
task that just completed.
*/ */
package harmonytask package harmonytask

View File

@ -4,20 +4,18 @@ import (
"context" "context"
"fmt" "fmt"
"strconv" "strconv"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/gin-gonic/gin"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/resources" "github.com/filecoin-project/lotus/lib/harmony/resources"
) )
// Consts (except for unit test) // Consts (except for unit test)
var POLL_DURATION = time.Minute // Poll for Work this frequently var POLL_DURATION = time.Second * 3 // Poll for Work this frequently
var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone var CLEANUP_FREQUENCY = 5 * time.Minute // Check for dead workers this often * everyone
var FOLLOW_FREQUENCY = 1 * time.Minute // Check for work to follow this often
type TaskTypeDetails struct { type TaskTypeDetails struct {
// Max returns how many tasks this machine can run of this type. // Max returns how many tasks this machine can run of this type.
@ -76,7 +74,7 @@ type TaskInterface interface {
// func (b *BazType)Adder(addTask AddTaskFunc) { // func (b *BazType)Adder(addTask AddTaskFunc) {
// for { // for {
// bazMaker := <- bazChannel // bazMaker := <- bazChannel
// addTask("baz", func(t harmonytask.TaskID, txn db.Transaction) bool { // addTask("baz", func(t harmonytask.TaskID, txn db.Transaction) (bool, error) {
// _, err := txn.Exec(`INSERT INTO bazInfoTable (taskID, qix, mot) // _, err := txn.Exec(`INSERT INTO bazInfoTable (taskID, qix, mot)
// VALUES ($1,$2,$3)`, id, bazMaker.qix, bazMaker.mot) // VALUES ($1,$2,$3)`, id, bazMaker.qix, bazMaker.mot)
// if err != nil { // if err != nil {
@ -90,7 +88,13 @@ type TaskInterface interface {
Adder(AddTaskFunc) Adder(AddTaskFunc)
} }
type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) (bool, error)) // AddTaskFunc is responsible for adding a task's details "extra info" to the DB.
// It should return true if the task should be added, false if it was already there.
// This is typically accomplished with a "unique" index on your detals table that
// would cause the insert to fail.
// The error indicates that instead of a conflict (which we should ignore) that we
// actually have a serious problem that needs to be logged with context.
type AddTaskFunc func(extraInfo func(TaskID, *harmonydb.Tx) (shouldCommit bool, seriousError error))
type TaskEngine struct { type TaskEngine struct {
ctx context.Context ctx context.Context
@ -101,7 +105,6 @@ type TaskEngine struct {
grace context.CancelFunc grace context.CancelFunc
taskMap map[string]*taskTypeHandler taskMap map[string]*taskTypeHandler
ownerID int ownerID int
tryAllWork chan bool // notify if work completed
follows map[string][]followStruct follows map[string][]followStruct
lastFollowTime time.Time lastFollowTime time.Time
lastCleanup atomic.Value lastCleanup atomic.Value
@ -134,7 +137,6 @@ func New(
reg: reg, reg: reg,
ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort" ownerID: reg.Resources.MachineID, // The current number representing "hostAndPort"
taskMap: make(map[string]*taskTypeHandler, len(impls)), taskMap: make(map[string]*taskTypeHandler, len(impls)),
tryAllWork: make(chan bool),
follows: make(map[string][]followStruct), follows: make(map[string][]followStruct),
} }
e.lastCleanup.Store(time.Now()) e.lastCleanup.Store(time.Now())
@ -146,23 +148,6 @@ func New(
} }
e.handlers = append(e.handlers, &h) e.handlers = append(e.handlers, &h)
e.taskMap[h.TaskTypeDetails.Name] = &h e.taskMap[h.TaskTypeDetails.Name] = &h
_, err := db.Exec(e.ctx, `INSERT INTO harmony_task_impl (owner_id, name)
VALUES ($1,$2)`, e.ownerID, h.Name)
if err != nil {
return nil, fmt.Errorf("can't update impl: %w", err)
}
for name, fn := range c.TypeDetails().Follows {
e.follows[name] = append(e.follows[name], followStruct{fn, &h, name})
// populate harmony_task_follows
_, err := db.Exec(e.ctx, `INSERT INTO harmony_task_follows (owner_id, from_task, to_task)
VALUES ($1,$2,$3)`, e.ownerID, name, h.Name)
if err != nil {
return nil, fmt.Errorf("can't update harmony_task_follows: %w", err)
}
}
} }
// resurrect old work // resurrect old work
@ -206,18 +191,6 @@ func (e *TaskEngine) GracefullyTerminate(deadline time.Duration) {
e.grace() e.grace()
e.reg.Shutdown() e.reg.Shutdown()
deadlineChan := time.NewTimer(deadline).C deadlineChan := time.NewTimer(deadline).C
ctx := context.TODO()
// block bumps & follows by unreg from DBs.
_, err := e.db.Exec(ctx, `DELETE FROM harmony_task_impl WHERE owner_id=$1`, e.ownerID)
if err != nil {
log.Warn("Could not clean-up impl table: %w", err)
}
_, err = e.db.Exec(ctx, `DELETE FROM harmony_task_follow WHERE owner_id=$1`, e.ownerID)
if err != nil {
log.Warn("Could not clean-up impl table: %w", err)
}
top: top:
for _, h := range e.handlers { for _, h := range e.handlers {
if h.Count.Load() > 0 { if h.Count.Load() > 0 {
@ -235,17 +208,18 @@ top:
func (e *TaskEngine) poller() { func (e *TaskEngine) poller() {
for { for {
select { select {
case <-e.tryAllWork: ///////////////////// Find work after some work finished
case <-time.NewTicker(POLL_DURATION).C: // Find work periodically case <-time.NewTicker(POLL_DURATION).C: // Find work periodically
case <-e.ctx.Done(): ///////////////////// Graceful exit case <-e.ctx.Done(): ///////////////////// Graceful exit
return return
} }
e.followWorkInDB() // "Follows" the slow way e.pollerTryAllWork()
e.pollerTryAllWork() // "Bumps" (round robin tasks) the slow way if time.Since(e.lastFollowTime) > FOLLOW_FREQUENCY {
e.followWorkInDB()
}
} }
} }
// followWorkInDB implements "Follows" the slow way // followWorkInDB implements "Follows"
func (e *TaskEngine) followWorkInDB() { func (e *TaskEngine) followWorkInDB() {
// Step 1: What are we following? // Step 1: What are we following?
var lastFollowTime time.Time var lastFollowTime time.Time
@ -286,14 +260,13 @@ func (e *TaskEngine) followWorkInDB() {
} }
} }
// pollerTryAllWork implements "Bumps" (next task) the slow way // pollerTryAllWork starts the next 1 task
func (e *TaskEngine) pollerTryAllWork() { func (e *TaskEngine) pollerTryAllWork() {
if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY { if time.Since(e.lastCleanup.Load().(time.Time)) > CLEANUP_FREQUENCY {
e.lastCleanup.Store(time.Now()) e.lastCleanup.Store(time.Now())
resources.CleanupMachines(e.ctx, e.db) resources.CleanupMachines(e.ctx, e.db)
} }
for _, v := range e.handlers { for _, v := range e.handlers {
rerun:
if v.AssertMachineHasCapacity() != nil { if v.AssertMachineHasCapacity() != nil {
continue continue
} }
@ -306,90 +279,12 @@ func (e *TaskEngine) pollerTryAllWork() {
log.Error("Unable to read work ", err) log.Error("Unable to read work ", err)
continue continue
} }
if len(unownedTasks) > 0 {
accepted := v.considerWork("poller", unownedTasks) accepted := v.considerWork("poller", unownedTasks)
if !accepted { if accepted {
log.Warn("Work not accepted") return // accept new work slowly and in priority order
continue
} }
if len(unownedTasks) > 1 { log.Warn("Work not accepted for " + strconv.Itoa(len(unownedTasks)) + " " + v.Name + " task(s)")
e.bump(v.Name) // wait for others before trying again to add work.
goto rerun
}
}
}
// GetHttpHandlers needs to be used by the http server to register routes.
// This implements the receiver-side of "follows" and "bumps" the fast way.
func (e *TaskEngine) ApplyHttpHandlers(root gin.IRouter) {
s := root.Group("/scheduler")
f := s.Group("/follows")
b := s.Group("/bump")
for name, vs := range e.follows {
name, vs := name, vs
f.GET("/"+name+"/:tID", func(c *gin.Context) {
tIDString := c.Param("tID")
tID, err := strconv.Atoi(tIDString)
if err != nil {
c.JSON(401, map[string]any{"error": err.Error()})
return
}
taskAdded := false
for _, vTmp := range vs {
v := vTmp
b, err := v.f(TaskID(tID), v.h.AddTask)
if err != nil {
log.Errorw("Follow attempt failed", "error", err, "from", name, "to", v.name)
}
taskAdded = taskAdded || b
}
if taskAdded {
e.tryAllWork <- true
c.Status(200)
return
}
c.Status(202) // NOTE: 202 for "accepted" but not worked.
})
}
for _, hTmp := range e.handlers {
h := hTmp
b.GET("/"+h.Name+"/:tID", func(c *gin.Context) {
tIDString := c.Param("tID")
tID, err := strconv.Atoi(tIDString)
if err != nil {
c.JSON(401, map[string]any{"error": err.Error()})
return
}
// We NEED to block while trying to deliver
// this work to ease the network impact.
if h.considerWork("bump", []TaskID{TaskID(tID)}) {
c.Status(200)
return
}
c.Status(202) // NOTE: 202 for "accepted" but not worked.
})
}
}
func (e *TaskEngine) bump(taskType string) {
var res []string
err := e.db.Select(e.ctx, &res, `SELECT host_and_port FROM harmony_machines m
JOIN harmony_task_impl i ON i.owner_id=m.id
WHERE i.name=$1`, taskType)
if err != nil {
log.Error("Could not read db for bump: ", err)
return
}
for _, url := range res {
if !strings.HasPrefix(strings.ToLower(url), "http") {
url = "http://"
}
resp, err := hClient.Get(url + "/scheduler/bump/" + taskType)
if err != nil {
log.Info("Server unreachable to bump: ", err)
continue
}
if resp.StatusCode == 200 {
return // just want 1 taker.
} }
} }
} }

View File

@ -4,15 +4,12 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"net/http"
"runtime" "runtime"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"time" "time"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/jackc/pgx/v5/pgconn"
"github.com/samber/lo" "github.com/samber/lo"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
@ -29,7 +26,7 @@ type taskTypeHandler struct {
func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) { func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error)) {
var tID TaskID var tID TaskID
did, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) { _, err := h.TaskEngine.db.BeginTransaction(h.TaskEngine.ctx, func(tx *harmonydb.Tx) (bool, error) {
// create taskID (from DB) // create taskID (from DB)
_, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time) _, err := tx.Exec(`INSERT INTO harmony_task (name, added_by, posted_time)
VALUES ($1, $2, CURRENT_TIMESTAMP) `, h.Name, h.TaskEngine.ownerID) VALUES ($1, $2, CURRENT_TIMESTAMP) `, h.Name, h.TaskEngine.ownerID)
@ -44,21 +41,13 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error
}) })
if err != nil { if err != nil {
var pgErr *pgconn.PgError if harmonydb.IsErrUniqueContraint(err) {
if errors.As(err, &pgErr) && pgErr.ConstraintName != "" { log.Debugf("addtask(%s) saw unique constraint, so it's added already.", h.Name)
log.Debug("addtask saw unique constraint ", pgErr.ConstraintName, ": so it's added already.")
return return
} }
log.Error("Could not add task. AddTasFunc failed: %v", err) log.Error("Could not add task. AddTasFunc failed: %v", err)
return return
} }
if !did {
return
}
if !h.considerWork("adder", []TaskID{tID}) {
h.TaskEngine.bump(h.Name) // We can't do it. How about someone else.
}
} }
func (h *taskTypeHandler) considerWork(from string, ids []TaskID) (workAccepted bool) { func (h *taskTypeHandler) considerWork(from string, ids []TaskID) (workAccepted bool) {
@ -112,8 +101,8 @@ top:
goto top goto top
} }
go func() {
h.Count.Add(1) h.Count.Add(1)
go func() {
log.Infow("Beginning work on Task", "id", *tID, "from", from, "name", h.Name) log.Infow("Beginning work on Task", "id", *tID, "from", from, "name", h.Name)
var done bool var done bool
@ -132,10 +121,12 @@ top:
h.recordCompletion(*tID, workStart, done, doErr) h.recordCompletion(*tID, workStart, done, doErr)
if done { if done {
h.triggerCompletionListeners(*tID) for _, fs := range h.TaskEngine.follows[h.Name] { // Do we know of any follows for this task type?
if _, err := fs.f(*tID, fs.h.AddTask); err != nil {
log.Error("Could not follow", "error", err, "from", h.Name, "to", fs.name)
}
}
} }
h.TaskEngine.tryAllWork <- true // Activate tasks in this machine
}() }()
done, doErr = h.Do(*tID, func() bool { done, doErr = h.Do(*tID, func() bool {
@ -244,65 +235,3 @@ func (h *taskTypeHandler) AssertMachineHasCapacity() error {
enoughGpuRam: enoughGpuRam:
return nil return nil
} }
var hClient = http.Client{}
func init() {
hClient.Timeout = 3 * time.Second
}
// triggerCompletionListeners does in order:
// 1. Trigger all in-process followers (b/c it's fast).
// 2. Trigger all living processes with followers via DB
// 3. Future followers (think partial upgrade) can read harmony_task_history
// 3a. The Listen() handles slow follows.
func (h *taskTypeHandler) triggerCompletionListeners(tID TaskID) {
// InProcess (#1 from Description)
inProcessDefs := h.TaskEngine.follows[h.Name]
inProcessFollowers := make([]string, len(inProcessDefs))
for _, fs := range inProcessDefs {
b, err := fs.f(tID, fs.h.AddTask)
if err != nil {
log.Error("Could not follow", "error", err, "from", h.Name, "to", fs.name)
}
if b {
inProcessFollowers = append(inProcessFollowers, fs.h.Name)
}
}
// Over HTTP (#2 from Description)
var hps []struct {
HostAndPort string
ToType string
}
err := h.TaskEngine.db.Select(h.TaskEngine.ctx, &hps, `SELECT m.host_and_port, to_type
FROM harmony_task_follow f JOIN harmony_machines m ON m.id=f.owner_id
WHERE from_type=$1 AND to_type NOT IN $2 AND f.owner_id != $3`,
h.Name, inProcessFollowers, h.TaskEngine.ownerID)
if err != nil {
log.Warn("Could not fast-trigger partner processes.", err)
return
}
hostsVisited := map[string]bool{}
tasksVisited := map[string]bool{}
for _, v := range hps {
if hostsVisited[v.HostAndPort] || tasksVisited[v.ToType] {
continue
}
resp, err := hClient.Get(v.HostAndPort + "/scheduler/follows/" + h.Name)
if err != nil {
log.Warn("Couldn't hit http endpoint: ", err)
continue
}
b, err := io.ReadAll(resp.Body)
if err != nil {
log.Warn("Couldn't hit http endpoint: ", err)
continue
}
hostsVisited[v.HostAndPort], tasksVisited[v.ToType] = true, true
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
log.Error("IO failed for fast nudge: ", string(b))
continue
}
}
}

View File

@ -2,9 +2,10 @@ package provider
import ( import (
"context" "context"
"time"
"github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer"
"time"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
@ -22,12 +23,12 @@ var log = logging.Logger("provider")
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig, func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker,
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex) (*lpwindow.WdPostTask, error) { as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, error) {
chainSched := chainsched.New(api) chainSched := chainsched.New(api)
// todo config // todo config
ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second) ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second)
return lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr) return lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max)
} }

View File

@ -2,12 +2,13 @@ package lpwindow
import ( import (
"context" "context"
"sort"
"time"
"github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer"
"sort"
"time"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/samber/lo" "github.com/samber/lo"
@ -69,6 +70,7 @@ type WdPostTask struct {
windowPoStTF promise.Promise[harmonytask.AddTaskFunc] windowPoStTF promise.Promise[harmonytask.AddTaskFunc]
actors []dtypes.MinerAddress actors []dtypes.MinerAddress
max int
} }
type wdTaskIdentity struct { type wdTaskIdentity struct {
@ -272,7 +274,7 @@ var res = storiface.ResourceTable[sealtasks.TTGenerateWindowPoSt]
func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails { func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{ return harmonytask.TaskTypeDetails{
Name: "WdPost", Name: "WdPost",
Max: 1, // TODO Max: t.max,
MaxFailures: 3, MaxFailures: 3,
Follows: nil, Follows: nil,
Cost: resources.Resources{ Cost: resources.Resources{
@ -348,6 +350,7 @@ func NewWdPostTask(db *harmonydb.DB,
pcs *chainsched.ProviderChainSched, pcs *chainsched.ProviderChainSched,
actors []dtypes.MinerAddress, actors []dtypes.MinerAddress,
max int,
) (*WdPostTask, error) { ) (*WdPostTask, error) {
t := &WdPostTask{ t := &WdPostTask{
db: db, db: db,
@ -358,6 +361,7 @@ func NewWdPostTask(db *harmonydb.DB,
verifier: verifier, verifier: verifier,
actors: actors, actors: actors,
max: max,
} }
if err := pcs.AddHandler(t.processHeadChange); err != nil { if err := pcs.AddHandler(t.processHeadChange); err != nil {

View File