fixes: mk instl, cfg defaults, cli tester

This commit is contained in:
Andrew Jackson (Ajax) 2023-11-14 19:00:23 -06:00
parent 3864831344
commit 793b078e5f
9 changed files with 291 additions and 123 deletions

View File

@ -128,7 +128,7 @@ an existing lotus binary in your PATH. This may cause problems if you don't run
.PHONY: build
install: install-daemon install-miner install-worker
install: install-daemon install-miner install-worker install-provider
install-daemon:
install -C ./lotus /usr/local/bin/lotus

View File

@ -12,6 +12,7 @@ import (
"github.com/BurntSushi/toml"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/node/config"
@ -42,19 +43,26 @@ var configDefaultCmd = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
c := config.DefaultLotusProvider()
cb, err := config.ConfigUpdate(c, nil, config.Commented(!cctx.Bool("no-comment")), config.DefaultKeepUncommented(), config.NoEnv())
comment := !cctx.Bool("no-comment")
cfg, err := getDefaultConfig(comment)
if err != nil {
return err
}
fmt.Print(string(cb))
fmt.Print(cfg)
return nil
},
}
func getDefaultConfig(comment bool) (string, error) {
c := config.DefaultLotusProvider()
cb, err := config.ConfigUpdate(c, nil, config.Commented(comment), config.DefaultKeepUncommented(), config.NoEnv())
if err != nil {
return "", err
}
return string(cb), nil
}
var configSetCmd = &cli.Command{
Name: "set",
Aliases: []string{"add"},
@ -190,7 +198,7 @@ var configRmCmd = &cli.Command{
var configViewCmd = &cli.Command{
Name: "interpret",
Aliases: []string{"view", "stacked", "stack"},
Usage: "Interpret stacked config layers by this version of lotus-provider.",
Usage: "Interpret stacked config layers by this version of lotus-provider, with system-generated comments.",
ArgsUsage: "a list of layers to be interpreted as the final config",
Flags: []cli.Flag{
&cli.StringSliceFlag{
@ -209,10 +217,12 @@ var configViewCmd = &cli.Command{
if err != nil {
return err
}
e := toml.NewEncoder(os.Stdout)
e.Indent = " "
return e.Encode(lp)
cb, err := config.ConfigUpdate(lp, config.DefaultLotusProvider(), config.Commented(true), config.DefaultKeepUncommented(), config.NoEnv())
if err != nil {
return xerrors.Errorf("cannot interpret config: %w", err)
}
fmt.Println(string(cb))
return nil
},
}

View File

@ -43,6 +43,7 @@ func main() {
runCmd,
stopCmd,
configCmd,
provingCmd,
//backupCmd,
//lcli.WithCategory("chain", actorCmd),
//lcli.WithCategory("storage", sectorsCmd),

View File

@ -161,7 +161,11 @@ environment variable LOTUS_WORKER_WINDOWPOST.
}
if !lo.Contains(titles, "base") {
_, err = db.Exec(ctx, "INSERT INTO harmony_config (title, config) VALUES ('base', '')", "base")
cfg, err := getDefaultConfig(true)
if err != nil {
return xerrors.Errorf("Cannot get default config: %w", err)
}
_, err = db.Exec(ctx, "INSERT INTO harmony_config (title, config) VALUES ('base', '$1')", cfg)
if err != nil {
return err
}

View File

@ -0,0 +1,105 @@
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/provider"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
)
var provingCmd = &cli.Command{
Name: "proving",
Usage: "Utility functions for proving sectors",
Subcommands: []*cli.Command{
//provingInfoCmd,
provingCompute,
},
}
var provingCompute = &cli.Command{
Name: "compute",
Usage: "Compute a proof-of-spacetime for a sector (requires the sector to be pre-sealed)",
Subcommands: []*cli.Command{
provingComputeWindowPoStCmd,
},
}
var provingComputeWindowPoStCmd = &cli.Command{
Name: "windowed-post",
Aliases: []string{"window-post"},
Usage: "Compute WindowPoSt for performance and configuration testing.",
Description: `Note: This command is intended to be used to verify PoSt compute performance.
It will not send any messages to the chain. Since it can compute any deadline, output may be incorrectly timed for the chain.`,
ArgsUsage: "[deadline index]",
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "deadline",
Usage: "deadline to compute WindowPoSt for ",
Value: 0,
},
&cli.StringSliceFlag{
Name: "layers",
Usage: "list of layers to be interpreted (atop defaults). Default: base",
Value: cli.NewStringSlice("base"),
},
&cli.StringFlag{
Name: "storage-json",
Usage: "path to json file containing storage config",
Value: "~/.lotus-provider/storage.json",
},
&cli.Uint64Flag{
Name: "partition",
Usage: "partition to compute WindowPoSt for",
Value: 0,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
deps, err := getDeps(cctx, ctx)
if err != nil {
return err
}
defer deps.fullCloser()
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.cfg.Fees, deps.cfg.Proving, deps.full, deps.verif, deps.lw,
deps.as, deps.maddrs, deps.db, deps.stor, deps.si, deps.cfg.Subsystems.WindowPostMaxTasks)
if err != nil {
return err
}
_, _ = wdPoStSubmitTask, derlareRecoverTask
if len(deps.maddrs) == 0 {
return errors.New("no miners to compute WindowPoSt for")
}
head, err := deps.full.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("failed to get chain head: %w", err)
}
di := dline.NewInfo(head.Height(), cctx.Uint64("deadline"), 0, 0, 0, 10 /*challenge window*/, 0, 0)
for _, maddr := range deps.maddrs {
out, err := wdPostTask.DoPartition(ctx, head, address.Address(maddr), di, cctx.Uint64("partition"))
if err != nil {
fmt.Println("Error computing WindowPoSt for miner", maddr, err)
continue
}
fmt.Println("Computed WindowPoSt for miner", maddr, ":")
err = json.NewEncoder(os.Stdout).Encode(out)
if err != nil {
fmt.Println("Could not encode WindowPoSt output for miner", maddr, err)
continue
}
}
return nil
},
}

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"encoding/base64"
"fmt"
"net"
@ -15,12 +16,14 @@ import (
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/pkg/errors"
"github.com/samber/lo"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-statestore"
@ -40,6 +43,7 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/provider"
"github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
@ -129,116 +133,13 @@ var runCmd = &cli.Command{
}
}
// Open repo
repoPath := cctx.String(FlagRepoPath)
fmt.Println("repopath", repoPath)
r, err := repo.NewFS(repoPath)
if err != nil {
return err
}
ok, err := r.Exists()
if err != nil {
return err
}
if !ok {
if err := r.Init(repo.Provider); err != nil {
return err
}
}
db, err := makeDB(cctx)
if err != nil {
return err
}
shutdownChan := make(chan struct{})
const unspecifiedAddress = "0.0.0.0"
listenAddr := cctx.String("listen")
addressSlice := strings.Split(listenAddr, ":")
if ip := net.ParseIP(addressSlice[0]); ip != nil {
if ip.String() == unspecifiedAddress {
rip, err := db.GetRoutableIP()
deps, err := getDeps(cctx, ctx)
if err != nil {
return err
}
listenAddr = rip + ":" + addressSlice[1]
}
}
///////////////////////////////////////////////////////////////////////
///// Dependency Setup
///////////////////////////////////////////////////////////////////////
// The config feeds into task runners & their helpers
cfg, err := getConfig(cctx, db)
if err != nil {
return err
}
log.Debugw("config", "config", cfg)
var verif storiface.Verifier = ffiwrapper.ProofVerifier
as, err := provider.AddressSelector(&cfg.Addresses)()
if err != nil {
return err
}
de, err := journal.ParseDisabledEvents(cfg.Journal.DisabledEvents)
if err != nil {
return err
}
j, err := fsjournal.OpenFSJournalPath(cctx.String("journal"), de)
if err != nil {
return err
}
defer j.Close()
full, fullCloser, err := cliutil.GetFullNodeAPIV1LotusProvider(cctx, cfg.Apis.ChainApiInfo)
if err != nil {
return err
}
defer fullCloser()
sa, err := StorageAuth(cfg.Apis.StorageRPCSecret)
if err != nil {
return xerrors.Errorf(`'%w' while parsing the config toml's
[Apis]
StorageRPCSecret=%v
Get it from the JSON documents in ~/.lotus-miner/keystore called .PrivateKey`, err, cfg.Apis.StorageRPCSecret)
}
al := alerting.NewAlertingSystem(j)
si := paths.NewIndexProxy(al, db, true)
bls := &paths.BasicLocalStorage{
PathToJSON: cctx.String("storage-json"),
}
localStore, err := paths.NewLocal(ctx, bls, si, []string{"http://" + listenAddr + "/remote"})
if err != nil {
return err
}
stor := paths.NewRemote(localStore, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore()))
// todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper
// maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably
// don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask)
lw := sealer.NewLocalWorker(sealer.WorkerConfig{}, stor, localStore, si, nil, wstates)
var maddrs []dtypes.MinerAddress
for _, s := range cfg.Addresses.MinerAddresses {
addr, err := address.NewFromString(s)
if err != nil {
return err
}
maddrs = append(maddrs, dtypes.MinerAddress(addr))
}
log.Infow("providers handled", "maddrs", maddrs)
cfg, db, full, verif, lw, as, maddrs, stor, si, localStore := deps.cfg, deps.db, deps.full, deps.verif, deps.lw, deps.as, deps.maddrs, deps.stor, deps.si, deps.localStore
defer deps.fullCloser()
///////////////////////////////////////////////////////////////////////
///// Task Selection
@ -255,7 +156,11 @@ Get it from the JSON documents in ~/.lotus-miner/keystore called .PrivateKey`, e
activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask, derlareRecoverTask)
}
}
taskEngine, err := harmonytask.New(db, activeTasks, listenAddr)
log.Infow("This lotus_provider instance handles",
"miner_addresses", maddrs,
"tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name }))
taskEngine, err := harmonytask.New(db, activeTasks, deps.listenAddr)
if err != nil {
return err
}
@ -345,3 +250,144 @@ func StorageAuth(apiKey string) (sealer.StorageAuth, error) {
headers.Add("Authorization", "Bearer "+string(token))
return sealer.StorageAuth(headers), nil
}
type Deps struct {
cfg *config.LotusProviderConfig
db *harmonydb.DB
full api.FullNode
fullCloser jsonrpc.ClientCloser
verif storiface.Verifier
lw *sealer.LocalWorker
as *ctladdr.AddressSelector
maddrs []dtypes.MinerAddress
stor *paths.Remote
si *paths.IndexProxy
localStore *paths.Local
listenAddr string
}
func getDeps(cctx *cli.Context, ctx context.Context) (*Deps, error) {
// Open repo
repoPath := cctx.String(FlagRepoPath)
fmt.Println("repopath", repoPath)
r, err := repo.NewFS(repoPath)
if err != nil {
return nil, err
}
ok, err := r.Exists()
if err != nil {
return nil, err
}
if !ok {
if err := r.Init(repo.Provider); err != nil {
return nil, err
}
}
db, err := makeDB(cctx)
if err != nil {
return nil, err
}
///////////////////////////////////////////////////////////////////////
///// Dependency Setup
///////////////////////////////////////////////////////////////////////
// The config feeds into task runners & their helpers
cfg, err := getConfig(cctx, db)
if err != nil {
return nil, err
}
log.Debugw("config", "config", cfg)
var verif storiface.Verifier = ffiwrapper.ProofVerifier
as, err := provider.AddressSelector(&cfg.Addresses)()
if err != nil {
return nil, err
}
de, err := journal.ParseDisabledEvents(cfg.Journal.DisabledEvents)
if err != nil {
return nil, err
}
j, err := fsjournal.OpenFSJournalPath(cctx.String("journal"), de)
if err != nil {
return nil, err
}
defer j.Close()
full, fullCloser, err := cliutil.GetFullNodeAPIV1LotusProvider(cctx, cfg.Apis.ChainApiInfo)
if err != nil {
return nil, err
}
defer fullCloser()
sa, err := StorageAuth(cfg.Apis.StorageRPCSecret)
if err != nil {
return nil, xerrors.Errorf(`'%w' while parsing the config toml's
[Apis]
StorageRPCSecret=%v
Get it from the JSON documents in ~/.lotus-miner/keystore called .PrivateKey`, err, cfg.Apis.StorageRPCSecret)
}
al := alerting.NewAlertingSystem(j)
si := paths.NewIndexProxy(al, db, true)
bls := &paths.BasicLocalStorage{
PathToJSON: cctx.String("storage-json"),
}
listenAddr := cctx.String("listen")
const unspecifiedAddress = "0.0.0.0"
addressSlice := strings.Split(listenAddr, ":")
if ip := net.ParseIP(addressSlice[0]); ip != nil {
if ip.String() == unspecifiedAddress {
rip, err := db.GetRoutableIP()
if err != nil {
return nil, err
}
listenAddr = rip + ":" + addressSlice[1]
}
}
localStore, err := paths.NewLocal(ctx, bls, si, []string{"http://" + listenAddr + "/remote"})
if err != nil {
return nil, err
}
stor := paths.NewRemote(localStore, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore()))
// todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper
// maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably
// don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask)
lw := sealer.NewLocalWorker(sealer.WorkerConfig{}, stor, localStore, si, nil, wstates)
var maddrs []dtypes.MinerAddress
for _, s := range cfg.Addresses.MinerAddresses {
addr, err := address.NewFromString(s)
if err != nil {
return nil, err
}
maddrs = append(maddrs, dtypes.MinerAddress(addr))
}
return &Deps{ // lint: intentionally not-named so it will fail if one is forgotten
cfg,
db,
full,
fullCloser,
verif,
lw,
as,
maddrs,
stor,
si,
localStore,
listenAddr,
}, nil
}

View File

@ -31,7 +31,7 @@ import (
const disablePreChecks = false // todo config
func (t *WdPostTask) doPartition(ctx context.Context, ts *types.TipSet, maddr address.Address, di *dline.Info, partIdx uint64) (out *miner2.SubmitWindowedPoStParams, err error) {
func (t *WdPostTask) DoPartition(ctx context.Context, ts *types.TipSet, maddr address.Address, di *dline.Info, partIdx uint64) (out *miner2.SubmitWindowedPoStParams, err error) {
defer func() {
if r := recover(); r != nil {
log.Errorf("recover: %s", r)

View File

@ -150,7 +150,7 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, err
}
postOut, err := t.doPartition(context.Background(), ts, maddr, deadline, partIdx)
postOut, err := t.DoPartition(context.Background(), ts, maddr, deadline, partIdx)
if err != nil {
log.Errorf("WdPostTask.Do() failed to doPartition: %v", err)
return false, err

View File

@ -1,4 +1,4 @@
package lpwindow
package lpwindow_test
import (
"testing"
@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/provider/lpwindow"
)
// test to create WDPostTask, invoke AddTask and check if the task is added to the DB
@ -23,7 +24,8 @@ func TestAddTask(t *testing.T) {
Database: "yugabyte",
})
require.NoError(t, err)
wdPostTask := NewWdPostTask(db, nil, 0)
wdPostTask, err := lpwindow.NewWdPostTask(db, nil, 0)
require.NoError(t, err)
taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
_ = taskEngine
ts := types.TipSet{}