Merge pull request #11419 from filecoin-project/fix-sturdy-tests

Fixes for lotus-provider, and conveniences
This commit is contained in:
Łukasz Magiera 2023-11-21 23:24:16 +01:00 committed by GitHub
commit aa2640dda3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 498 additions and 227 deletions

View File

@ -1030,7 +1030,7 @@ workflows:
requires:
- build
suite: utest-unit-rest
target: "./blockstore/... ./build/... ./chain/... ./conformance/... ./gateway/... ./journal/... ./lib/... ./markets/... ./paychmgr/... ./provider/... ./tools/..."
target: "./blockstore/... ./build/... ./chain/... ./conformance/... ./gateway/... ./journal/... ./lib/... ./markets/... ./paychmgr/... ./tools/..."
- test:
name: test-unit-storage

View File

@ -128,7 +128,7 @@ an existing lotus binary in your PATH. This may cause problems if you don't run
.PHONY: build
install: install-daemon install-miner install-worker
install: install-daemon install-miner install-worker install-provider
install-daemon:
install -C ./lotus /usr/local/bin/lotus

Binary file not shown.

View File

@ -1,2 +1,2 @@
/dns4/bootstrap-0.butterfly.fildev.network/tcp/1347/p2p/12D3KooWNwAkUtWuLtKCyyFP2vBzmpTHSrQao7KQx7Xfa8YvSg1N
/dns4/bootstrap-1.butterfly.fildev.network/tcp/1347/p2p/12D3KooWPn8BDeNcctAcAGuxxiic8uMw2pAi3G5vgdFtfgRs5zBu
/dns4/bootstrap-0.butterfly.fildev.network/tcp/1347/p2p/12D3KooWRaoPgwJuZdPSN4A2iTeh8xzkZGCEBxan9vMkidHisUgn
/dns4/bootstrap-1.butterfly.fildev.network/tcp/1347/p2p/12D3KooWMjLCZeEf3VzSWvQYuhe9VzCcrN6RENX9FgmQqiJfQDWs

Binary file not shown.

View File

@ -847,7 +847,8 @@ var NetStatCmd = &cli.Command{
})
for _, stat := range stats {
printScope(&stat.stat, name+stat.name)
tmp := stat.stat
printScope(&tmp, name+stat.name)
}
}

View File

@ -559,7 +559,8 @@ var provingCheckProvableCmd = &cli.Command{
for parIdx, par := range partitions {
sectors := make(map[abi.SectorNumber]struct{})
sectorInfos, err := api.StateMinerSectors(ctx, addr, &par.LiveSectors, types.EmptyTSK)
tmp := par.LiveSectors
sectorInfos, err := api.StateMinerSectors(ctx, addr, &tmp, types.EmptyTSK)
if err != nil {
return err
}

View File

@ -12,6 +12,7 @@ import (
"github.com/BurntSushi/toml"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/node/config"
@ -42,19 +43,26 @@ var configDefaultCmd = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
c := config.DefaultLotusProvider()
cb, err := config.ConfigUpdate(c, nil, config.Commented(!cctx.Bool("no-comment")), config.DefaultKeepUncommented(), config.NoEnv())
comment := !cctx.Bool("no-comment")
cfg, err := getDefaultConfig(comment)
if err != nil {
return err
}
fmt.Print(string(cb))
fmt.Print(cfg)
return nil
},
}
func getDefaultConfig(comment bool) (string, error) {
c := config.DefaultLotusProvider()
cb, err := config.ConfigUpdate(c, nil, config.Commented(comment), config.DefaultKeepUncommented(), config.NoEnv())
if err != nil {
return "", err
}
return string(cb), nil
}
var configSetCmd = &cli.Command{
Name: "set",
Aliases: []string{"add"},
@ -190,7 +198,7 @@ var configRmCmd = &cli.Command{
var configViewCmd = &cli.Command{
Name: "interpret",
Aliases: []string{"view", "stacked", "stack"},
Usage: "Interpret stacked config layers by this version of lotus-provider.",
Usage: "Interpret stacked config layers by this version of lotus-provider, with system-generated comments.",
ArgsUsage: "a list of layers to be interpreted as the final config",
Flags: []cli.Flag{
&cli.StringSliceFlag{
@ -209,10 +217,12 @@ var configViewCmd = &cli.Command{
if err != nil {
return err
}
e := toml.NewEncoder(os.Stdout)
e.Indent = " "
return e.Encode(lp)
cb, err := config.ConfigUpdate(lp, config.DefaultLotusProvider(), config.Commented(true), config.DefaultKeepUncommented(), config.NoEnv())
if err != nil {
return xerrors.Errorf("cannot interpret config: %w", err)
}
fmt.Println(string(cb))
return nil
},
}

View File

@ -43,6 +43,7 @@ func main() {
runCmd,
stopCmd,
configCmd,
testCmd,
//backupCmd,
//lcli.WithCategory("chain", actorCmd),
//lcli.WithCategory("storage", sectorsCmd),

View File

@ -181,7 +181,12 @@ environment variable LOTUS_WORKER_WINDOWPOST.
}
if !lo.Contains(titles, "base") {
_, err = db.Exec(ctx, "INSERT INTO harmony_config (title, config) VALUES ('base', '')")
cfg, err := getDefaultConfig(true)
if err != nil {
return xerrors.Errorf("Cannot get default config: %w", err)
}
_, err = db.Exec(ctx, "INSERT INTO harmony_config (title, config) VALUES ('base', '$1')", cfg)
if err != nil {
return err
}

View File

@ -0,0 +1,180 @@
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/provider"
)
var testCmd = &cli.Command{
Name: "test",
Usage: "Utility functions for testing",
Subcommands: []*cli.Command{
//provingInfoCmd,
wdPostCmd,
},
}
var wdPostCmd = &cli.Command{
Name: "window-post",
Aliases: []string{"wd", "windowpost", "wdpost"},
Usage: "Compute a proof-of-spacetime for a sector (requires the sector to be pre-sealed). These will not send to the chain.",
Subcommands: []*cli.Command{
wdPostHereCmd,
wdPostTaskCmd,
},
}
var wdPostTaskCmd = &cli.Command{
Name: "task",
Aliases: []string{"scheduled", "schedule", "async", "asynchronous"},
Usage: "Test the windowpost scheduler by running it on the next available lotus-provider. ",
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "deadline",
Usage: "deadline to compute WindowPoSt for ",
Value: 0,
},
&cli.StringSliceFlag{
Name: "layers",
Usage: "list of layers to be interpreted (atop defaults). Default: base",
Value: cli.NewStringSlice("base"),
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
deps, err := getDeps(ctx, cctx)
if err != nil {
return err
}
ts, err := deps.full.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("cannot get chainhead %w", err)
}
ht := ts.Height()
addr, err := address.NewFromString(deps.cfg.Addresses.MinerAddresses[0])
if err != nil {
return xerrors.Errorf("cannot get miner address %w", err)
}
maddr, err := address.IDFromAddress(addr)
if err != nil {
return xerrors.Errorf("cannot get miner id %w", err)
}
did, err := deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
_, err = tx.Exec(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123)`)
if err != nil {
log.Error("inserting harmony_task: ", err)
return false, xerrors.Errorf("inserting harmony_task: %w", err)
}
var id int64
if err = tx.QueryRow(`SELECT id FROM harmony_task ORDER BY update_time DESC LIMIT 1`).Scan(&id); err != nil {
log.Error("getting inserted id: ", err)
return false, xerrors.Errorf("getting inserted id: %w", err)
}
_, err = tx.Exec(`INSERT INTO wdpost_partition_tasks
(task_id, sp_id, proving_period_start, deadline_index, partition_index) VALUES ($1, $2, $3, $4, $5)`,
id, maddr, ht, cctx.Uint64("deadline"), 0)
if err != nil {
log.Error("inserting wdpost_partition_tasks: ", err)
return false, xerrors.Errorf("inserting wdpost_partition_tasks: %w", err)
}
_, err = tx.Exec("INSERT INTO harmony_test (task_id) VALUES ($1)", id)
if err != nil {
return false, xerrors.Errorf("inserting into harmony_tests: %w", err)
}
return true, nil
})
if err != nil {
return xerrors.Errorf("writing SQL transaction: %w", err)
}
log.Infof("Inserted task %v", did)
log.Infof("Check your lotus-provider logs for more details.")
return nil
},
}
var wdPostHereCmd = &cli.Command{
Name: "here",
Aliases: []string{"cli"},
Usage: "Compute WindowPoSt for performance and configuration testing.",
Description: `Note: This command is intended to be used to verify PoSt compute performance.
It will not send any messages to the chain. Since it can compute any deadline, output may be incorrectly timed for the chain.`,
ArgsUsage: "[deadline index]",
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "deadline",
Usage: "deadline to compute WindowPoSt for ",
Value: 0,
},
&cli.StringSliceFlag{
Name: "layers",
Usage: "list of layers to be interpreted (atop defaults). Default: base",
Value: cli.NewStringSlice("base"),
},
&cli.StringFlag{
Name: "storage-json",
Usage: "path to json file containing storage config",
Value: "~/.lotus-provider/storage.json",
},
&cli.Uint64Flag{
Name: "partition",
Usage: "partition to compute WindowPoSt for",
Value: 0,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
deps, err := getDeps(ctx, cctx)
if err != nil {
return err
}
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.cfg.Fees, deps.cfg.Proving, deps.full, deps.verif, deps.lw,
deps.as, deps.maddrs, deps.db, deps.stor, deps.si, deps.cfg.Subsystems.WindowPostMaxTasks)
if err != nil {
return err
}
_, _ = wdPoStSubmitTask, derlareRecoverTask
if len(deps.maddrs) == 0 {
return errors.New("no miners to compute WindowPoSt for")
}
head, err := deps.full.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("failed to get chain head: %w", err)
}
di := dline.NewInfo(head.Height(), cctx.Uint64("deadline"), 0, 0, 0, 10 /*challenge window*/, 0, 0)
for _, maddr := range deps.maddrs {
out, err := wdPostTask.DoPartition(ctx, head, address.Address(maddr), di, cctx.Uint64("partition"))
if err != nil {
fmt.Println("Error computing WindowPoSt for miner", maddr, err)
continue
}
fmt.Println("Computed WindowPoSt for miner", maddr, ":")
err = json.NewEncoder(os.Stdout).Encode(out)
if err != nil {
fmt.Println("Could not encode WindowPoSt output for miner", maddr, err)
continue
}
}
return nil
},
}

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"encoding/base64"
"fmt"
"net"
@ -15,6 +16,7 @@ import (
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/pkg/errors"
"github.com/samber/lo"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
@ -40,6 +42,7 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/provider"
"github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/provider/lpwinning"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer"
@ -113,6 +116,12 @@ var runCmd = &cli.Command{
tag.Insert(metrics.Commit, build.CurrentCommit),
tag.Insert(metrics.NodeType, "provider"),
)
shutdownChan := make(chan struct{})
ctx, ctxclose := context.WithCancel(ctx)
go func() {
<-shutdownChan
ctxclose()
}()
// Register all metric views
/*
if err := view.Register(
@ -130,116 +139,12 @@ var runCmd = &cli.Command{
}
}
// Open repo
deps, err := getDeps(ctx, cctx)
repoPath := cctx.String(FlagRepoPath)
fmt.Println("repopath", repoPath)
r, err := repo.NewFS(repoPath)
if err != nil {
return err
}
ok, err := r.Exists()
if err != nil {
return err
}
if !ok {
if err := r.Init(repo.Provider); err != nil {
return err
}
}
db, err := makeDB(cctx)
if err != nil {
return err
}
shutdownChan := make(chan struct{})
const unspecifiedAddress = "0.0.0.0"
listenAddr := cctx.String("listen")
addressSlice := strings.Split(listenAddr, ":")
if ip := net.ParseIP(addressSlice[0]); ip != nil {
if ip.String() == unspecifiedAddress {
rip, err := db.GetRoutableIP()
if err != nil {
return err
}
listenAddr = rip + ":" + addressSlice[1]
}
}
///////////////////////////////////////////////////////////////////////
///// Dependency Setup
///////////////////////////////////////////////////////////////////////
// The config feeds into task runners & their helpers
cfg, err := getConfig(cctx, db)
if err != nil {
return err
}
log.Debugw("config", "config", cfg)
var verif storiface.Verifier = ffiwrapper.ProofVerifier
as, err := provider.AddressSelector(&cfg.Addresses)()
if err != nil {
return err
}
de, err := journal.ParseDisabledEvents(cfg.Journal.DisabledEvents)
if err != nil {
return err
}
j, err := fsjournal.OpenFSJournalPath(cctx.String("journal"), de)
if err != nil {
return err
}
defer j.Close()
full, fullCloser, err := cliutil.GetFullNodeAPIV1LotusProvider(cctx, cfg.Apis.ChainApiInfo)
if err != nil {
return err
}
defer fullCloser()
sa, err := StorageAuth(cfg.Apis.StorageRPCSecret)
if err != nil {
return xerrors.Errorf(`'%w' while parsing the config toml's
[Apis]
StorageRPCSecret=%v
Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, err, cfg.Apis.StorageRPCSecret)
}
al := alerting.NewAlertingSystem(j)
si := paths.NewIndexProxy(al, db, true)
bls := &paths.BasicLocalStorage{
PathToJSON: cctx.String("storage-json"),
}
localStore, err := paths.NewLocal(ctx, bls, si, []string{"http://" + listenAddr + "/remote"})
if err != nil {
return err
}
stor := paths.NewRemote(localStore, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore()))
// todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper
// maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably
// don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask)
lw := sealer.NewLocalWorker(sealer.WorkerConfig{}, stor, localStore, si, nil, wstates)
var maddrs []dtypes.MinerAddress
for _, s := range cfg.Addresses.MinerAddresses {
addr, err := address.NewFromString(s)
if err != nil {
return err
}
maddrs = append(maddrs, dtypes.MinerAddress(addr))
}
log.Infow("providers handled", "maddrs", maddrs)
cfg, db, full, verif, lw, as, maddrs, stor, si, localStore := deps.cfg, deps.db, deps.full, deps.verif, deps.lw, deps.as, deps.maddrs, deps.stor, deps.si, deps.localStore
///////////////////////////////////////////////////////////////////////
///// Task Selection
@ -261,7 +166,11 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`,
activeTasks = append(activeTasks, winPoStTask)
}
}
taskEngine, err := harmonytask.New(db, activeTasks, listenAddr)
log.Infow("This lotus_provider instance handles",
"miner_addresses", maddrs,
"tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name }))
taskEngine, err := harmonytask.New(db, activeTasks, deps.listenAddr)
if err != nil {
return err
}
@ -306,7 +215,6 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`,
//node.ShutdownHandler{Component: "provider", StopFunc: stop},
<-finishCh
return nil
},
}
@ -351,3 +259,147 @@ func StorageAuth(apiKey string) (sealer.StorageAuth, error) {
headers.Add("Authorization", "Bearer "+string(token))
return sealer.StorageAuth(headers), nil
}
type Deps struct {
cfg *config.LotusProviderConfig
db *harmonydb.DB
full api.FullNode
verif storiface.Verifier
lw *sealer.LocalWorker
as *ctladdr.AddressSelector
maddrs []dtypes.MinerAddress
stor *paths.Remote
si *paths.IndexProxy
localStore *paths.Local
listenAddr string
}
func getDeps(ctx context.Context, cctx *cli.Context) (*Deps, error) {
// Open repo
repoPath := cctx.String(FlagRepoPath)
fmt.Println("repopath", repoPath)
r, err := repo.NewFS(repoPath)
if err != nil {
return nil, err
}
ok, err := r.Exists()
if err != nil {
return nil, err
}
if !ok {
if err := r.Init(repo.Provider); err != nil {
return nil, err
}
}
db, err := makeDB(cctx)
if err != nil {
return nil, err
}
///////////////////////////////////////////////////////////////////////
///// Dependency Setup
///////////////////////////////////////////////////////////////////////
// The config feeds into task runners & their helpers
cfg, err := getConfig(cctx, db)
if err != nil {
return nil, err
}
log.Debugw("config", "config", cfg)
var verif storiface.Verifier = ffiwrapper.ProofVerifier
as, err := provider.AddressSelector(&cfg.Addresses)()
if err != nil {
return nil, err
}
de, err := journal.ParseDisabledEvents(cfg.Journal.DisabledEvents)
if err != nil {
return nil, err
}
j, err := fsjournal.OpenFSJournalPath(cctx.String("journal"), de)
if err != nil {
return nil, err
}
full, fullCloser, err := cliutil.GetFullNodeAPIV1LotusProvider(cctx, cfg.Apis.ChainApiInfo)
if err != nil {
return nil, err
}
go func() {
select {
case <-ctx.Done():
fullCloser()
_ = j.Close()
}
}()
sa, err := StorageAuth(cfg.Apis.StorageRPCSecret)
if err != nil {
return nil, xerrors.Errorf(`'%w' while parsing the config toml's
[Apis]
StorageRPCSecret=%v
Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, err, cfg.Apis.StorageRPCSecret)
}
al := alerting.NewAlertingSystem(j)
si := paths.NewIndexProxy(al, db, true)
bls := &paths.BasicLocalStorage{
PathToJSON: cctx.String("storage-json"),
}
listenAddr := cctx.String("listen")
const unspecifiedAddress = "0.0.0.0"
addressSlice := strings.Split(listenAddr, ":")
if ip := net.ParseIP(addressSlice[0]); ip != nil {
if ip.String() == unspecifiedAddress {
rip, err := db.GetRoutableIP()
if err != nil {
return nil, err
}
listenAddr = rip + ":" + addressSlice[1]
}
}
localStore, err := paths.NewLocal(ctx, bls, si, []string{"http://" + listenAddr + "/remote"})
if err != nil {
return nil, err
}
stor := paths.NewRemote(localStore, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore()))
// todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper
// maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably
// don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask)
lw := sealer.NewLocalWorker(sealer.WorkerConfig{}, stor, localStore, si, nil, wstates)
var maddrs []dtypes.MinerAddress
for _, s := range cfg.Addresses.MinerAddresses {
addr, err := address.NewFromString(s)
if err != nil {
return nil, err
}
maddrs = append(maddrs, dtypes.MinerAddress(addr))
}
return &Deps{ // lint: intentionally not-named so it will fail if one is forgotten
cfg,
db,
full,
verif,
lw,
as,
maddrs,
stor,
si,
localStore,
listenAddr,
}, nil
}

View File

@ -157,7 +157,8 @@ var terminationsCmd = &cli.Command{
}
for _, t := range termParams.Terminations {
sectors, err := minerSt.LoadSectors(&t.Sectors)
tmp := t.Sectors
sectors, err := minerSt.LoadSectors(&tmp)
if err != nil {
return err
}

View File

@ -72,7 +72,13 @@ func (t *task1) Adder(add harmonytask.AddTaskFunc) {
}
}
func init() {
//logging.SetLogLevel("harmonydb", "debug")
//logging.SetLogLevel("harmonytask", "debug")
}
func TestHarmonyTasks(t *testing.T) {
//t.Parallel()
withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
t1 := &task1{
@ -82,7 +88,7 @@ func TestHarmonyTasks(t *testing.T) {
harmonytask.POLL_DURATION = time.Millisecond * 100
e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1")
require.NoError(t, err)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE.
e.GracefullyTerminate(time.Minute)
expected := []string{"taskResult56", "taskResult73"}
sort.Strings(t1.WorkCompleted)
@ -154,6 +160,7 @@ func fooLetterSaver(t *testing.T, cdb *harmonydb.DB, dest *[]string) *passthru {
}
func TestHarmonyTasksWith2PartiesPolling(t *testing.T) {
//t.Parallel()
withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
senderParty := fooLetterAdder(t, cdb)
@ -164,7 +171,7 @@ func TestHarmonyTasksWith2PartiesPolling(t *testing.T) {
require.NoError(t, err)
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{workerParty}, "test:2")
require.NoError(t, err)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE.
sender.GracefullyTerminate(time.Second * 5)
worker.GracefullyTerminate(time.Second * 5)
sort.Strings(dest)
@ -173,14 +180,15 @@ func TestHarmonyTasksWith2PartiesPolling(t *testing.T) {
}
func TestWorkStealing(t *testing.T) {
//t.Parallel()
withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
ctx := context.Background()
// The dead worker will be played by a few SQL INSERTS.
_, err := cdb.Exec(ctx, `INSERT INTO harmony_machines
(id, last_contact,host_and_port, cpu, ram, gpu, gpuram)
VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1, 1000000)`)
(id, last_contact,host_and_port, cpu, ram, gpu)
VALUES (300, DATE '2000-01-01', 'test:1', 4, 400000, 1)`)
require.ErrorIs(t, err, nil)
_, err = cdb.Exec(ctx, `INSERT INTO harmony_task
(id, name, owner_id, posted_time, added_by)
@ -194,13 +202,14 @@ func TestWorkStealing(t *testing.T) {
var dest []string
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb, &dest)}, "test:2")
require.ErrorIs(t, err, nil)
time.Sleep(3 * time.Second) // do the work. FLAKYNESS RISK HERE.
time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE.
worker.GracefullyTerminate(time.Second * 5)
require.Equal(t, []string{"M"}, dest)
})
}
func TestTaskRetry(t *testing.T) {
//t.Parallel()
withDbSetup(t, func(m *kit.TestMiner) {
cdb := m.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
senderParty := fooLetterAdder(t, cdb)
@ -232,7 +241,7 @@ func TestTaskRetry(t *testing.T) {
}
rcv, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fails2xPerMsg}, "test:2")
require.NoError(t, err)
time.Sleep(3 * time.Second)
time.Sleep(time.Second)
sender.GracefullyTerminate(time.Hour)
rcv.GracefullyTerminate(time.Hour)
sort.Strings(dest)

View File

@ -3,7 +3,6 @@ package harmonydb
import (
"context"
"embed"
"errors"
"fmt"
"math/rand"
"net"
@ -17,6 +16,7 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/node/config"
)
@ -205,16 +205,16 @@ func ensureSchemaExists(connString, schema string) error {
p, err := pgx.Connect(ctx, connString)
defer cncl()
if err != nil {
return fmt.Errorf("unable to connect to db: %s, err: %v", connString, err)
return xerrors.Errorf("unable to connect to db: %s, err: %v", connString, err)
}
defer func() { _ = p.Close(context.Background()) }()
if len(schema) < 5 || !schemaRE.MatchString(schema) {
return errors.New("schema must be of the form " + schemaREString + "\n Got: " + schema)
return xerrors.New("schema must be of the form " + schemaREString + "\n Got: " + schema)
}
_, err = p.Exec(context.Background(), "CREATE SCHEMA IF NOT EXISTS "+schema)
if err != nil {
return fmt.Errorf("cannot create schema: %w", err)
return xerrors.Errorf("cannot create schema: %w", err)
}
return nil
}
@ -232,7 +232,7 @@ func (db *DB) upgrade() error {
)`)
if err != nil {
logger.Error("Upgrade failed.")
return err
return xerrors.Errorf("Cannot create base table %w", err)
}
// __Run scripts in order.__
@ -243,10 +243,10 @@ func (db *DB) upgrade() error {
err = db.Select(context.Background(), &landedEntries, "SELECT entry FROM base")
if err != nil {
logger.Error("Cannot read entries: " + err.Error())
return err
return xerrors.Errorf("cannot read entries: %w", err)
}
for _, l := range landedEntries {
landed[l.Entry] = true
landed[l.Entry[:8]] = true
}
}
dir, err := fs.ReadDir("sql")
@ -261,7 +261,11 @@ func (db *DB) upgrade() error {
}
for _, e := range dir {
name := e.Name()
if landed[name] || !strings.HasSuffix(name, ".sql") {
if !strings.HasSuffix(name, ".sql") {
logger.Debug("Must have only SQL files here, found: " + name)
continue
}
if landed[name[:8]] {
logger.Debug("DB Schema " + name + " already applied.")
continue
}
@ -278,15 +282,15 @@ func (db *DB) upgrade() error {
if err != nil {
msg := fmt.Sprintf("Could not upgrade! File %s, Query: %s, Returned: %s", name, s, err.Error())
logger.Error(msg)
return errors.New(msg) // makes devs lives easier by placing message at the end.
return xerrors.New(msg) // makes devs lives easier by placing message at the end.
}
}
// Mark Completed.
_, err = db.Exec(context.Background(), "INSERT INTO base (entry) VALUES ($1)", name)
_, err = db.Exec(context.Background(), "INSERT INTO base (entry) VALUES ($1)", name[:8])
if err != nil {
logger.Error("Cannot update base: " + err.Error())
return fmt.Errorf("cannot insert into base: %w", err)
return xerrors.Errorf("cannot insert into base: %w", err)
}
}
return nil

View File

@ -0,0 +1,6 @@
CREATE TABLE harmony_test (
task_id bigint
constraint harmony_test_pk
primary key,
options text
);

View File

@ -111,7 +111,9 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, er
var commit bool
defer func() { // Panic clean-up.
if !commit {
retErr = tx.Rollback(ctx)
if tmp := tx.Rollback(ctx); tmp != nil {
retErr = tmp
}
}
}()
commit, err = f(&Tx{tx, ctx})
@ -119,7 +121,7 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, er
return false, err
}
if commit {
err := tx.Commit(ctx)
err = tx.Commit(ctx)
if err != nil {
return false, err
}

View File

@ -198,7 +198,7 @@ func (h *taskTypeHandler) recordCompletion(tID TaskID, workStart time.Time, done
}
}
_, err = tx.Exec(`INSERT INTO harmony_task_history
(task_id, name, posted, work_start, work_end, result, by_host_and_port, err)
(task_id, name, posted, work_start, work_end, result, completed_by_host_and_port, err)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, tID, h.Name, postedTime, workStart, workEnd, done, h.TaskEngine.hostAndPort, result)
if err != nil {
return false, fmt.Errorf("could not write history: %w", err)

View File

@ -93,8 +93,9 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
}
func CleanupMachines(ctx context.Context, db *harmonydb.DB) int {
ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`,
time.Now().Add(-1*LOOKS_DEAD_TIMEOUT).UTC())
ct, err := db.Exec(ctx,
`DELETE FROM harmony_machines WHERE last_contact < CURRENT_TIMESTAMP - INTERVAL '1 MILLISECOND' * $1 `,
LOOKS_DEAD_TIMEOUT.Milliseconds()) // ms enables unit testing to change timeout.
if err != nil {
logger.Warn("unable to delete old machines: ", err)
}

View File

@ -16,7 +16,7 @@ import (
func basicTest(t *testing.T, repo Repo) {
apima, err := repo.APIEndpoint()
if assert.Error(t, err) {
assert.Equal(t, ErrNoAPIEndpoint, err)
assert.ErrorContains(t, err, ErrNoAPIEndpoint.Error())
}
assert.Nil(t, apima, "with no api endpoint, return should be nil")
@ -72,7 +72,7 @@ func basicTest(t *testing.T, repo Repo) {
apima, err = repo.APIEndpoint()
if assert.Error(t, err) {
assert.Equal(t, ErrNoAPIEndpoint, err, "after closing repo, api should be nil")
assert.ErrorContains(t, err, ErrNoAPIEndpoint.Error(), "after closing repo, api should be nil")
}
assert.Nil(t, apima, "with closed repo, apima should be set back to nil")

View File

@ -18,6 +18,10 @@ import (
var log = logging.Logger("lpmessage")
type str string // makes ctx value collissions impossible
var CtxTaskID str = "task_id"
type SenderAPI interface {
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
@ -47,8 +51,7 @@ func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) *Sender {
return &Sender{
api: api,
signer: signer,
db: db,
db: db,
}
}
@ -100,6 +103,14 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
var sigMsg *types.SignedMessage
var idCount int
err = s.db.QueryRow(ctx, `SELECT COUNT(*) FROM harmony_test WHERE task_id=$1`,
ctx.Value(CtxTaskID)).Scan(&idCount)
if err != nil {
return cid.Undef, xerrors.Errorf("reading harmony_test: %w", err)
}
noSend := idCount == 1
// start db tx
c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// assign nonce (max(api.MpoolGetNonce, db nonce+1))
@ -137,6 +148,18 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
return false, xerrors.Errorf("marshaling message: %w", err)
}
if noSend {
log.Errorw("SKIPPED SENDING MESSAGE PER ENVIRONMENT VARIABLE - NOT PRODUCTION SAFE",
"from_key", fromA.String(),
"nonce", msg.Nonce,
"to_addr", msg.To.String(),
"signed_data", data,
"signed_json", string(jsonBytes),
"signed_cid", sigMsg.Cid(),
"send_reason", reason,
)
return true, nil // nothing committed
}
// write to db
c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`,
fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason)
@ -153,6 +176,9 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
if err != nil || !c {
return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err)
}
if noSend {
return sigMsg.Cid(), nil
}
// push to mpool
_, err = s.api.MpoolPush(ctx, sigMsg)
@ -168,7 +194,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
return cid.Undef, xerrors.Errorf("updating db record: %w", err)
}
if cn != 1 {
return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", c)
return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", cn)
}
log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit)

View File

@ -31,7 +31,7 @@ import (
const disablePreChecks = false // todo config
func (t *WdPostTask) doPartition(ctx context.Context, ts *types.TipSet, maddr address.Address, di *dline.Info, partIdx uint64) (out *miner2.SubmitWindowedPoStParams, err error) {
func (t *WdPostTask) DoPartition(ctx context.Context, ts *types.TipSet, maddr address.Address, di *dline.Info, partIdx uint64) (out *miner2.SubmitWindowedPoStParams, err error) {
defer func() {
if r := recover(); r != nil {
log.Errorf("recover: %s", r)
@ -207,8 +207,9 @@ func (t *WdPostTask) doPartition(ctx context.Context, ts *types.TipSet, maddr ad
time.Sleep(5 * time.Second)
continue todo retry loop */
} else if !correct {
_ = correct
/*log.Errorw("generated incorrect window post proof", "post", postOut, "error", err)
continue todo retry loop */
continue todo retry loop*/
}
// Proof generation successful, stop retrying
@ -322,11 +323,11 @@ func (t *WdPostTask) sectorsForProof(ctx context.Context, maddr address.Address,
if err := allSectors.ForEach(func(sectorNo uint64) error {
if info, found := sectorByID[sectorNo]; found {
proofSectors = append(proofSectors, info)
} else {
//skip
// todo: testing: old logic used to put 'substitute' sectors here
// that probably isn't needed post nv19, but we do need to check that
}
} //else {
//skip
// todo: testing: old logic used to put 'substitute' sectors here
// that probably isn't needed post nv19, but we do need to check that
//}
return nil
}); err != nil {
return nil, xerrors.Errorf("iterating partition sector bitmap: %w", err)

View File

@ -74,10 +74,10 @@ type WdPostTask struct {
}
type wdTaskIdentity struct {
Sp_id uint64
Proving_period_start abi.ChainEpoch
Deadline_index uint64
Partition_index uint64
SpID uint64 `db:"sp_id"`
ProvingPeriodStart abi.ChainEpoch `db:"proving_period_start"`
DeadlineIndex uint64 `db:"deadline_index"`
PartitionIndex uint64 `db:"partition_index"`
}
func NewWdPostTask(db *harmonydb.DB,
@ -150,7 +150,7 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, err
}
postOut, err := t.doPartition(context.Background(), ts, maddr, deadline, partIdx)
postOut, err := t.DoPartition(context.Background(), ts, maddr, deadline, partIdx)
if err != nil {
log.Errorf("WdPostTask.Do() failed to doPartition: %v", err)
return false, err
@ -206,11 +206,11 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
// GetData for tasks
type wdTaskDef struct {
Task_id harmonytask.TaskID
Sp_id uint64
Proving_period_start abi.ChainEpoch
Deadline_index uint64
Partition_index uint64
TaskID harmonytask.TaskID
SpID uint64
ProvingPeriodStart abi.ChainEpoch
DeadlineIndex uint64
PartitionIndex uint64
dlInfo *dline.Info `pgx:"-"`
openTs *types.TipSet
@ -232,10 +232,10 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
// Accept those past deadline, then delete them in Do().
for i := range tasks {
tasks[i].dlInfo = wdpost.NewDeadlineInfo(tasks[i].Proving_period_start, tasks[i].Deadline_index, ts.Height())
tasks[i].dlInfo = wdpost.NewDeadlineInfo(tasks[i].ProvingPeriodStart, tasks[i].DeadlineIndex, ts.Height())
if tasks[i].dlInfo.PeriodElapsed() {
return &tasks[i].Task_id, nil
return &tasks[i].TaskID, nil
}
tasks[i].openTs, err = t.api.ChainGetTipSetAfterHeight(context.Background(), tasks[i].dlInfo.Open, ts.Key())
@ -281,7 +281,7 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
var r int
err := t.db.QueryRow(context.Background(), `SELECT COUNT(*)
FROM harmony_task_history
WHERE task_id = $1 AND result = false`, d.Task_id).Scan(&r)
WHERE task_id = $1 AND result = false`, d.TaskID).Scan(&r)
if err != nil {
log.Errorf("WdPostTask.CanAccept() failed to queryRow: %v", err)
}
@ -293,7 +293,7 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
return tasks[i].dlInfo.Open < tasks[j].dlInfo.Open
})
return &tasks[0].Task_id, nil
return &tasks[0].TaskID, nil
}
var res = storiface.ResourceTable[sealtasks.TTGenerateWindowPoSt]
@ -353,10 +353,10 @@ func (t *WdPostTask) processHeadChange(ctx context.Context, revert, apply *types
for pidx := range partitions {
tid := wdTaskIdentity{
Sp_id: aid,
Proving_period_start: di.PeriodStart,
Deadline_index: di.Index,
Partition_index: uint64(pidx),
SpID: aid,
ProvingPeriodStart: di.PeriodStart,
DeadlineIndex: di.Index,
PartitionIndex: uint64(pidx),
}
tf := t.windowPoStTF.Val(ctx)
@ -384,10 +384,10 @@ func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIden
partition_index
) VALUES ($1, $2, $3, $4, $5)`,
taskId,
taskIdent.Sp_id,
taskIdent.Proving_period_start,
taskIdent.Deadline_index,
taskIdent.Partition_index,
taskIdent.SpID,
taskIdent.ProvingPeriodStart,
taskIdent.DeadlineIndex,
taskIdent.PartitionIndex,
)
if err != nil {
return false, xerrors.Errorf("insert partition task: %w", err)

View File

@ -1,33 +0,0 @@
package lpwindow
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/node/config"
)
// test to create WDPostTask, invoke AddTask and check if the task is added to the DB
func TestAddTask(t *testing.T) {
db, err := harmonydb.NewFromConfig(config.HarmonyDB{
Hosts: []string{"localhost"},
Port: "5433",
Username: "yugabyte",
Password: "yugabyte",
Database: "yugabyte",
})
require.NoError(t, err)
wdPostTask := NewWdPostTask(db, nil, 0)
taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
_ = taskEngine
ts := types.TipSet{}
deadline := dline.Info{}
require.NoError(t, err)
}

View File

@ -166,7 +166,7 @@ func (w *WdPostRecoverDeclareTask) Do(taskID harmonytask.TaskID, stillOwned func
recDecl := miner.RecoveryDeclaration{
Deadline: dlIdx,
Partition: uint64(partIdx),
Partition: partIdx,
Sectors: recovered,
}
@ -187,6 +187,9 @@ func (w *WdPostRecoverDeclareTask) Do(taskID harmonytask.TaskID, stillOwned func
}
msg, mss, err := preparePoStMessage(w.api, w.as, maddr, msg, abi.TokenAmount(w.maxDeclareRecoveriesGasFee))
if err != nil {
return false, xerrors.Errorf("sending declare recoveries message: %w", err)
}
mc, err := w.sender.Send(ctx, msg, mss, "declare-recoveries")
if err != nil {
@ -279,10 +282,10 @@ func (w *WdPostRecoverDeclareTask) processHeadChange(ctx context.Context, revert
}
tid := wdTaskIdentity{
Sp_id: aid,
Proving_period_start: pps,
Deadline_index: declDeadline,
Partition_index: uint64(pidx),
SpID: aid,
ProvingPeriodStart: pps,
DeadlineIndex: declDeadline,
PartitionIndex: uint64(pidx),
}
tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
@ -304,10 +307,10 @@ func (w *WdPostRecoverDeclareTask) addTaskToDB(taskId harmonytask.TaskID, taskId
partition_index
) VALUES ($1, $2, $3, $4, $5)`,
taskId,
taskIdent.Sp_id,
taskIdent.Proving_period_start,
taskIdent.Deadline_index,
taskIdent.Partition_index,
taskIdent.SpID,
taskIdent.ProvingPeriodStart,
taskIdent.DeadlineIndex,
taskIdent.PartitionIndex,
)
if err != nil {
return false, xerrors.Errorf("insert partition task: %w", err)

View File

@ -149,14 +149,15 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
return false, xerrors.Errorf("preparing proof message: %w", err)
}
smsg, err := w.sender.Send(context.Background(), msg, mss, "wdpost")
ctx := context.WithValue(context.Background(), lpmessage.CtxTaskID, taskID)
smsg, err := w.sender.Send(ctx, msg, mss, "wdpost")
if err != nil {
return false, xerrors.Errorf("sending proof message: %w", err)
}
// set message_cid in the wdpost_proofs entry
_, err = w.db.Exec(context.Background(), `UPDATE wdpost_proofs SET message_cid = $1 WHERE sp_id = $2 AND proving_period_start = $3 AND deadline = $4 AND partition = $5`, smsg.String(), spID, pps, deadline, partition)
_, err = w.db.Exec(ctx, `UPDATE wdpost_proofs SET message_cid = $1 WHERE sp_id = $2 AND proving_period_start = $3 AND deadline = $4 AND partition = $5`, smsg.String(), spID, pps, deadline, partition)
if err != nil {
return true, xerrors.Errorf("updating wdpost_proofs: %w", err)
}
@ -257,7 +258,7 @@ func preparePoStMessage(w MsgPrepAPI, as *ctladdr.AddressSelector, maddr address
msg.From = mi.Worker
mss := &api.MessageSendSpec{
MaxFee: abi.TokenAmount(maxFee),
MaxFee: maxFee,
}
// (optimal) initial estimation with some overestimation that guarantees