diff --git a/cmd/lotus-provider/main.go b/cmd/lotus-provider/main.go index 8e8be00db..380923b07 100644 --- a/cmd/lotus-provider/main.go +++ b/cmd/lotus-provider/main.go @@ -68,12 +68,6 @@ func main() { Version: build.UserVersion(), EnableBashCompletion: true, Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "listen", - Usage: "host address and port the worker api will listen on", - Value: "0.0.0.0:3456", - EnvVars: []string{"LOTUS_WORKER_LISTEN"}, - }, &cli.BoolFlag{ // examined in the Before above Name: "color", diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index 603ad4028..c08868022 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -25,7 +25,6 @@ import ( "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/storiface" @@ -36,8 +35,10 @@ var runCmd = &cli.Command{ Usage: "Start a lotus provider process", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "provider-api", - Usage: "Port (default 12300)", + Name: "listen", + Usage: "host address and port the worker api will listen on", + Value: "0.0.0.0:12300", + EnvVars: []string{"LOTUS_WORKER_LISTEN"}, }, &cli.BoolFlag{ Name: "nosync", @@ -80,6 +81,10 @@ var runCmd = &cli.Command{ Hidden: true, Value: "5433", }, + &cli.StringFlag{ + Name: FlagProviderRepo, + Value: "~/lotus", + }, }, Action: func(cctx *cli.Context) error { if !cctx.Bool("enable-gpu-proving") { @@ -153,29 +158,21 @@ var runCmd = &cli.Command{ } } - lr, err := r.Lock(repo.Provider) - if err != nil { - return err + dbConfig := config.HarmonyDB{ + Username: cctx.String("db-user"), + Password: cctx.String("db-password"), + Hosts: strings.Split(cctx.String("db-host"), ","), + Database: cctx.String("db-name"), + Port: cctx.String("db-port"), } - defer func() { - if err := lr.Close(); err != nil { - log.Error("closing repo", err) - } - }() - - db, err := harmonydb.NewFromConfig(config.HarmonyDB{ - Username: cctx.String("db_user"), - Password: cctx.String("db_password"), - Hosts: strings.Split(cctx.String("db_host"), ","), - Database: cctx.String("db_name"), - Port: cctx.String("db_port"), - }) + db, err := harmonydb.NewFromConfig(dbConfig) if err != nil { return err } shutdownChan := make(chan struct{}) + /* defaults break lockedRepo (below) stop, err := node.New(ctx, node.Override(new(dtypes.ShutdownChan), shutdownChan), node.Provider(r), @@ -183,6 +180,7 @@ var runCmd = &cli.Command{ if err != nil { return fmt.Errorf("creating node: %w", err) } + */ const unspecifiedAddress = "0.0.0.0" address := cctx.String("listen") @@ -196,6 +194,16 @@ var runCmd = &cli.Command{ address = rip + ":" + addressSlice[1] } } + + lr, err := r.Lock(repo.Provider) + if err != nil { + return err + } + defer func() { + if err := lr.Close(); err != nil { + log.Error("closing repo", err) + } + }() localStore, err := paths.NewLocal(ctx, lr, nil, []string{"http://" + address + "/remote"}) if err != nil { return err @@ -239,7 +247,7 @@ var runCmd = &cli.Command{ // Monitor for shutdown. finishCh := node.MonitorShutdown(shutdownChan, node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, - node.ShutdownHandler{Component: "provider", StopFunc: stop}, + //node.ShutdownHandler{Component: "provider", StopFunc: stop}, ) <-finishCh diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index ab8de50b9..3f970a90e 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -11,6 +11,7 @@ import ( "time" logging "github.com/ipfs/go-log/v2" + "github.com/jackc/pgx/v5/pgconn" "github.com/samber/lo" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" @@ -36,12 +37,19 @@ func (h *taskTypeHandler) AddTask(extra func(TaskID, *harmonydb.Tx) (bool, error } err = tx.QueryRow("SELECT id FROM harmony_task ORDER BY update_time DESC LIMIT 1").Scan(&tID) if err != nil { - log.Error("Could not select ID: ", err) + return false, fmt.Errorf("Could not select ID: %v", err) } return extra(tID, tx) }) + if err != nil { - log.Error(err) + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) && pgErr.ConstraintName != "" { + log.Debug("addtask saw unique constraint ", pgErr.ConstraintName, ": so it's added already.") + return + } + log.Error("Could not add task. AddTasFunc failed: %v", err) + return } if !did { return diff --git a/node/builder.go b/node/builder.go index ce18747ec..e116fd807 100644 --- a/node/builder.go +++ b/node/builder.go @@ -343,21 +343,11 @@ func Repo(r repo.Repo) Option { func Provider(r repo.Repo) Option { return func(settings *Settings) error { - lr, err := r.Lock(settings.nodeType) - if err != nil { - return err - } - c, err := lr.Config() - if err != nil { - return err - } - _ = c return Options( func(s *Settings) error { s.Base = true; return nil }, // mark Base as applied ApplyIf(func(s *Settings) bool { return s.Config }, Error(errors.New("the Base() option must be set before Config option")), ), - Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing //ApplyIf(IsType(repo.WdPost), ConfigWdPost(c)), //ApplyIf(IsType(repo.WinPost), ConfigWinPost(c)), )(settings)