Merge branch 'feat/sturdypost' into feat/lotus-provider
This commit is contained in:
commit
1e2a16bd8a
@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"github.com/libp2p/go-libp2p/core/crypto"
|
"github.com/libp2p/go-libp2p/core/crypto"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/mitchellh/go-homedir"
|
"github.com/mitchellh/go-homedir"
|
||||||
@ -47,6 +48,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/genesis"
|
"github.com/filecoin-project/lotus/genesis"
|
||||||
"github.com/filecoin-project/lotus/journal"
|
"github.com/filecoin-project/lotus/journal"
|
||||||
"github.com/filecoin-project/lotus/journal/fsjournal"
|
"github.com/filecoin-project/lotus/journal/fsjournal"
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||||
storageminer "github.com/filecoin-project/lotus/miner"
|
storageminer "github.com/filecoin-project/lotus/miner"
|
||||||
"github.com/filecoin-project/lotus/node/config"
|
"github.com/filecoin-project/lotus/node/config"
|
||||||
"github.com/filecoin-project/lotus/node/modules"
|
"github.com/filecoin-project/lotus/node/modules"
|
||||||
@ -463,7 +465,16 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
|
|||||||
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
|
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
|
||||||
smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix))
|
smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix))
|
||||||
|
|
||||||
si := paths.NewIndex(nil)
|
// TODO: run sector index init only for devnets. This is not needed for longer running networks
|
||||||
|
harmonyDB, err := harmonydb.New([]string{"127.0.0.1"}, "yugabyte", "yugabyte", "yugabyte", "5433", "",
|
||||||
|
func(s string) { logging.Logger("harmonydb").Error(s) })
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
enableSectorIndexDB := true
|
||||||
|
|
||||||
|
si := paths.NewIndexProxy(nil, harmonyDB, enableSectorIndexDB)
|
||||||
|
|
||||||
lstor, err := paths.NewLocal(ctx, lr, si, nil)
|
lstor, err := paths.NewLocal(ctx, lr, si, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -145,6 +145,14 @@
|
|||||||
# env var: LOTUS_SUBSYSTEMS_ENABLEMARKETS
|
# env var: LOTUS_SUBSYSTEMS_ENABLEMARKETS
|
||||||
#EnableMarkets = false
|
#EnableMarkets = false
|
||||||
|
|
||||||
|
# When enabled, the sector index will reside in an external database
|
||||||
|
# as opposed to the local KV store in the miner process
|
||||||
|
# This is useful to allow workers to bypass the lotus miner to access sector information
|
||||||
|
#
|
||||||
|
# type: bool
|
||||||
|
# env var: LOTUS_SUBSYSTEMS_ENABLESECTORINDEXDB
|
||||||
|
#EnableSectorIndexDB = false
|
||||||
|
|
||||||
# type: string
|
# type: string
|
||||||
# env var: LOTUS_SUBSYSTEMS_SEALERAPIINFO
|
# env var: LOTUS_SUBSYSTEMS_SEALERAPIINFO
|
||||||
#SealerApiInfo = ""
|
#SealerApiInfo = ""
|
||||||
|
@ -68,7 +68,7 @@ func TestTransaction(t *testing.T) {
|
|||||||
if _, err := cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int) VALUES (4), (5), (6)"); err != nil {
|
if _, err := cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int) VALUES (4), (5), (6)"); err != nil {
|
||||||
t.Fatal("E0", err)
|
t.Fatal("E0", err)
|
||||||
}
|
}
|
||||||
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool) {
|
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
if _, err := tx.Exec("INSERT INTO itest_scratch (some_int) VALUES (7), (8), (9)"); err != nil {
|
if _, err := tx.Exec("INSERT INTO itest_scratch (some_int) VALUES (7), (8), (9)"); err != nil {
|
||||||
t.Fatal("E1", err)
|
t.Fatal("E1", err)
|
||||||
}
|
}
|
||||||
@ -90,7 +90,7 @@ func TestTransaction(t *testing.T) {
|
|||||||
if sum2 != 4+5+6+7+8+9 {
|
if sum2 != 4+5+6+7+8+9 {
|
||||||
t.Fatal("Expected 39, got ", sum2)
|
t.Fatal("Expected 39, got ", sum2)
|
||||||
}
|
}
|
||||||
return false // rollback
|
return false, nil // rollback
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("ET", err)
|
t.Fatal("ET", err)
|
||||||
|
@ -15,6 +15,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestPathTypeFilters(t *testing.T) {
|
func TestPathTypeFilters(t *testing.T) {
|
||||||
|
|
||||||
runTest := func(t *testing.T, name string, asserts func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func())) {
|
runTest := func(t *testing.T, name string, asserts func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func())) {
|
||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
41
lib/harmony/harmonydb/sql/20230712.sql
Normal file
41
lib/harmony/harmonydb/sql/20230712.sql
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
create table sector_location
|
||||||
|
(
|
||||||
|
miner_id bigint not null,
|
||||||
|
sector_num bigint not null,
|
||||||
|
sector_filetype int not null,
|
||||||
|
storage_id varchar not null,
|
||||||
|
is_primary bool,
|
||||||
|
read_ts timestamp(6),
|
||||||
|
read_refs int,
|
||||||
|
write_ts timestamp(6),
|
||||||
|
write_lock_owner varchar,
|
||||||
|
constraint sectorlocation_pk
|
||||||
|
primary key (miner_id, sector_num, sector_filetype, storage_id)
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
create table storage_path
|
||||||
|
(
|
||||||
|
"storage_id" varchar not null
|
||||||
|
constraint "storage_path_pkey"
|
||||||
|
primary key,
|
||||||
|
"urls" varchar, -- comma separated list of urls
|
||||||
|
"weight" bigint,
|
||||||
|
"max_storage" bigint,
|
||||||
|
"can_seal" bool,
|
||||||
|
"can_store" bool,
|
||||||
|
"groups" varchar, -- comma separated list of group names
|
||||||
|
"allow_to" varchar, -- comma separated list of allowed groups
|
||||||
|
"allow_types" varchar, -- comma separated list of allowed file types
|
||||||
|
"deny_types" varchar, -- comma separated list of denied file types
|
||||||
|
|
||||||
|
"capacity" bigint,
|
||||||
|
"available" bigint,
|
||||||
|
"fs_available" bigint,
|
||||||
|
"reserved" bigint,
|
||||||
|
"used" bigint,
|
||||||
|
"last_heartbeat" timestamp(6),
|
||||||
|
"heartbeat_err" varchar
|
||||||
|
);
|
||||||
|
|
@ -100,7 +100,7 @@ type Tx struct {
|
|||||||
// BeginTransaction is how you can access transactions using this library.
|
// BeginTransaction is how you can access transactions using this library.
|
||||||
// The entire transaction happens in the function passed in.
|
// The entire transaction happens in the function passed in.
|
||||||
// The return must be true or a rollback will occur.
|
// The return must be true or a rollback will occur.
|
||||||
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (didCommit bool, retErr error) {
|
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
|
||||||
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
|
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
@ -111,7 +111,10 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (
|
|||||||
retErr = tx.Rollback(ctx)
|
retErr = tx.Rollback(ctx)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
commit = f(&Tx{tx, ctx})
|
commit, err = f(&Tx{tx, ctx})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
if commit {
|
if commit {
|
||||||
err := tx.Commit(ctx)
|
err := tx.Commit(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -126,10 +126,14 @@ func ConfigStorageMiner(c interface{}) Option {
|
|||||||
Override(new(sectorblocks.SectorBuilder), From(new(*sealing.Sealing))),
|
Override(new(sectorblocks.SectorBuilder), From(new(*sealing.Sealing))),
|
||||||
),
|
),
|
||||||
|
|
||||||
|
Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) {
|
||||||
|
return harmonydb.NewFromConfigWithITestID(cfg)(id)
|
||||||
|
}),
|
||||||
|
|
||||||
If(cfg.Subsystems.EnableSectorStorage,
|
If(cfg.Subsystems.EnableSectorStorage,
|
||||||
// Sector storage
|
// Sector storage
|
||||||
Override(new(*paths.Index), paths.NewIndex),
|
Override(new(*paths.IndexProxy), paths.NewIndexProxyHelper(cfg.Subsystems.EnableSectorIndexDB)),
|
||||||
Override(new(paths.SectorIndex), From(new(*paths.Index))),
|
Override(new(paths.SectorIndex), From(new(*paths.IndexProxy))),
|
||||||
Override(new(*sectorstorage.Manager), modules.SectorStorage),
|
Override(new(*sectorstorage.Manager), modules.SectorStorage),
|
||||||
Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))),
|
Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))),
|
||||||
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
|
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
|
||||||
@ -234,9 +238,6 @@ func ConfigStorageMiner(c interface{}) Option {
|
|||||||
Override(new(config.HarmonyDB), cfg.HarmonyDB),
|
Override(new(config.HarmonyDB), cfg.HarmonyDB),
|
||||||
Override(new(harmonydb.ITestID), harmonydb.ITestID("")),
|
Override(new(harmonydb.ITestID), harmonydb.ITestID("")),
|
||||||
Override(new(*ctladdr.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
|
Override(new(*ctladdr.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
|
||||||
Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) {
|
|
||||||
return harmonydb.NewFromConfigWithITestID(cfg)(id)
|
|
||||||
}),
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -235,6 +235,7 @@ func DefaultStorageMiner() *StorageMiner {
|
|||||||
EnableSealing: true,
|
EnableSealing: true,
|
||||||
EnableSectorStorage: true,
|
EnableSectorStorage: true,
|
||||||
EnableMarkets: false,
|
EnableMarkets: false,
|
||||||
|
EnableSectorIndexDB: false,
|
||||||
},
|
},
|
||||||
|
|
||||||
Fees: MinerFeeConfig{
|
Fees: MinerFeeConfig{
|
||||||
|
@ -71,6 +71,8 @@ func TestDefaultMinerRoundtrip(t *testing.T) {
|
|||||||
|
|
||||||
fmt.Println(s)
|
fmt.Println(s)
|
||||||
|
|
||||||
|
fmt.Println(c)
|
||||||
|
fmt.Println(c2)
|
||||||
require.True(t, reflect.DeepEqual(c, c2))
|
require.True(t, reflect.DeepEqual(c, c2))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -747,6 +747,14 @@ over the worker address if this flag is set.`,
|
|||||||
|
|
||||||
Comment: ``,
|
Comment: ``,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "EnableSectorIndexDB",
|
||||||
|
Type: "bool",
|
||||||
|
|
||||||
|
Comment: `When enabled, the sector index will reside in an external database
|
||||||
|
as opposed to the local KV store in the miner process
|
||||||
|
This is useful to allow workers to bypass the lotus miner to access sector information`,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Name: "SealerApiInfo",
|
Name: "SealerApiInfo",
|
||||||
Type: "string",
|
Type: "string",
|
||||||
|
@ -110,6 +110,11 @@ type MinerSubsystemConfig struct {
|
|||||||
EnableSectorStorage bool
|
EnableSectorStorage bool
|
||||||
EnableMarkets bool
|
EnableMarkets bool
|
||||||
|
|
||||||
|
// When enabled, the sector index will reside in an external database
|
||||||
|
// as opposed to the local KV store in the miner process
|
||||||
|
// This is useful to allow workers to bypass the lotus miner to access sector information
|
||||||
|
EnableSectorIndexDB bool
|
||||||
|
|
||||||
SealerApiInfo string // if EnableSealing == false
|
SealerApiInfo string // if EnableSealing == false
|
||||||
SectorIndexApiInfo string // if EnableSectorStorage == false
|
SectorIndexApiInfo string // if EnableSectorStorage == false
|
||||||
}
|
}
|
||||||
|
1001
storage/paths/db_index.go
Normal file
1001
storage/paths/db_index.go
Normal file
File diff suppressed because it is too large
Load Diff
@ -215,6 +215,7 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st
|
|||||||
|
|
||||||
lastHeartbeat: time.Now(),
|
lastHeartbeat: time.Now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
118
storage/paths/index_proxy.go
Normal file
118
storage/paths/index_proxy.go
Normal file
@ -0,0 +1,118 @@
|
|||||||
|
package paths
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/journal/alerting"
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
|
)
|
||||||
|
|
||||||
|
type IndexProxy struct {
|
||||||
|
memIndex *Index
|
||||||
|
dbIndex *DBIndex
|
||||||
|
enableSectorIndexDB bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewIndexProxyHelper(enableSectorIndexDB bool) func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy {
|
||||||
|
return func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy {
|
||||||
|
return NewIndexProxy(al, db, enableSectorIndexDB)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewIndexProxy(al *alerting.Alerting, db *harmonydb.DB, enableSectorIndexDB bool) *IndexProxy {
|
||||||
|
return &IndexProxy{
|
||||||
|
memIndex: NewIndex(al),
|
||||||
|
dbIndex: NewDBIndex(al, db),
|
||||||
|
enableSectorIndexDB: enableSectorIndexDB,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ip *IndexProxy) StorageAttach(ctx context.Context, info storiface.StorageInfo, stat fsutil.FsStat) error {
|
||||||
|
if ip.enableSectorIndexDB {
|
||||||
|
return ip.dbIndex.StorageAttach(ctx, info, stat)
|
||||||
|
}
|
||||||
|
return ip.memIndex.StorageAttach(ctx, info, stat)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ip *IndexProxy) StorageDetach(ctx context.Context, id storiface.ID, url string) error {
|
||||||
|
if ip.enableSectorIndexDB {
|
||||||
|
return ip.dbIndex.StorageDetach(ctx, id, url)
|
||||||
|
}
|
||||||
|
return ip.memIndex.StorageDetach(ctx, id, url)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ip *IndexProxy) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) {
|
||||||
|
if ip.enableSectorIndexDB {
|
||||||
|
return ip.dbIndex.StorageInfo(ctx, id)
|
||||||
|
}
|
||||||
|
return ip.memIndex.StorageInfo(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ip *IndexProxy) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
|
||||||
|
if ip.enableSectorIndexDB {
|
||||||
|
return ip.dbIndex.StorageReportHealth(ctx, id, report)
|
||||||
|
}
|
||||||
|
return ip.memIndex.StorageReportHealth(ctx, id, report)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ip *IndexProxy) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
|
||||||
|
if ip.enableSectorIndexDB {
|
||||||
|
return ip.dbIndex.StorageDeclareSector(ctx, storageID, s, ft, primary)
|
||||||
|
}
|
||||||
|
return ip.memIndex.StorageDeclareSector(ctx, storageID, s, ft, primary)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ip *IndexProxy) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error {
|
||||||
|
if ip.enableSectorIndexDB {
|
||||||
|
return ip.dbIndex.StorageDropSector(ctx, storageID, s, ft)
|
||||||
|
}
|
||||||
|
return ip.memIndex.StorageDropSector(ctx, storageID, s, ft)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ip *IndexProxy) StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) {
|
||||||
|
if ip.enableSectorIndexDB {
|
||||||
|
return ip.dbIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch)
|
||||||
|
}
|
||||||
|
return ip.memIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ip *IndexProxy) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) {
|
||||||
|
if ip.enableSectorIndexDB {
|
||||||
|
return ip.dbIndex.StorageBestAlloc(ctx, allocate, ssize, pathType)
|
||||||
|
}
|
||||||
|
return ip.memIndex.StorageBestAlloc(ctx, allocate, ssize, pathType)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ip *IndexProxy) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
|
||||||
|
if ip.enableSectorIndexDB {
|
||||||
|
return ip.dbIndex.StorageLock(ctx, sector, read, write)
|
||||||
|
}
|
||||||
|
return ip.memIndex.StorageLock(ctx, sector, read, write)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ip *IndexProxy) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
|
||||||
|
if ip.enableSectorIndexDB {
|
||||||
|
return ip.dbIndex.StorageTryLock(ctx, sector, read, write)
|
||||||
|
}
|
||||||
|
return ip.memIndex.StorageTryLock(ctx, sector, read, write)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ip *IndexProxy) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) {
|
||||||
|
if ip.enableSectorIndexDB {
|
||||||
|
return ip.dbIndex.StorageGetLocks(ctx)
|
||||||
|
}
|
||||||
|
return ip.memIndex.StorageGetLocks(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ip *IndexProxy) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) {
|
||||||
|
if ip.enableSectorIndexDB {
|
||||||
|
return ip.dbIndex.StorageList(ctx)
|
||||||
|
}
|
||||||
|
return ip.memIndex.StorageList(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ SectorIndex = &IndexProxy{}
|
@ -16,10 +16,6 @@ const (
|
|||||||
AcquireCopy AcquireMode = "copy"
|
AcquireCopy AcquireMode = "copy"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Refs struct {
|
|
||||||
RefCount [FileTypes]uint
|
|
||||||
}
|
|
||||||
|
|
||||||
type SectorLock struct {
|
type SectorLock struct {
|
||||||
Sector abi.SectorID
|
Sector abi.SectorID
|
||||||
Write [FileTypes]uint
|
Write [FileTypes]uint
|
||||||
|
Loading…
Reference in New Issue
Block a user