Merge pull request #11135 from filecoin-project/sbansal/sector-index

feat: sector index yugabyte implementation
This commit is contained in:
Shrenuj Bansal 2023-08-22 12:42:12 -04:00 committed by GitHub
commit 5b6daa2c6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1211 additions and 14 deletions

View File

@ -16,6 +16,7 @@ import (
"github.com/google/uuid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/mitchellh/go-homedir"
@ -47,6 +48,7 @@ import (
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/fsjournal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
storageminer "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
@ -463,7 +465,16 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix))
si := paths.NewIndex(nil)
// TODO: run sector index init only for devnets. This is not needed for longer running networks
harmonyDB, err := harmonydb.New([]string{"127.0.0.1"}, "yugabyte", "yugabyte", "yugabyte", "5433", "",
func(s string) { logging.Logger("harmonydb").Error(s) })
if err != nil {
return err
}
enableSectorIndexDB := true
si := paths.NewIndexProxy(nil, harmonyDB, enableSectorIndexDB)
lstor, err := paths.NewLocal(ctx, lr, si, nil)
if err != nil {

View File

@ -145,6 +145,14 @@
# env var: LOTUS_SUBSYSTEMS_ENABLEMARKETS
#EnableMarkets = false
# When enabled, the sector index will reside in an external database
# as opposed to the local KV store in the miner process
# This is useful to allow workers to bypass the lotus miner to access sector information
#
# type: bool
# env var: LOTUS_SUBSYSTEMS_ENABLESECTORINDEXDB
#EnableSectorIndexDB = false
# type: string
# env var: LOTUS_SUBSYSTEMS_SEALERAPIINFO
#SealerApiInfo = ""

View File

@ -68,7 +68,7 @@ func TestTransaction(t *testing.T) {
if _, err := cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int) VALUES (4), (5), (6)"); err != nil {
t.Fatal("E0", err)
}
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool) {
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
if _, err := tx.Exec("INSERT INTO itest_scratch (some_int) VALUES (7), (8), (9)"); err != nil {
t.Fatal("E1", err)
}
@ -90,7 +90,7 @@ func TestTransaction(t *testing.T) {
if sum2 != 4+5+6+7+8+9 {
t.Fatal("Expected 39, got ", sum2)
}
return false // rollback
return false, nil // rollback
})
if err != nil {
t.Fatal("ET", err)

View File

@ -15,6 +15,7 @@ import (
)
func TestPathTypeFilters(t *testing.T) {
runTest := func(t *testing.T, name string, asserts func(t *testing.T, ctx context.Context, miner *kit.TestMiner, run func())) {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

View File

@ -0,0 +1,41 @@
create table sector_location
(
miner_id bigint not null,
sector_num bigint not null,
sector_filetype int not null,
storage_id varchar not null,
is_primary bool,
read_ts timestamp(6),
read_refs int,
write_ts timestamp(6),
write_lock_owner varchar,
constraint sectorlocation_pk
primary key (miner_id, sector_num, sector_filetype, storage_id)
);
create table storage_path
(
"storage_id" varchar not null
constraint "storage_path_pkey"
primary key,
"urls" varchar, -- comma separated list of urls
"weight" bigint,
"max_storage" bigint,
"can_seal" bool,
"can_store" bool,
"groups" varchar, -- comma separated list of group names
"allow_to" varchar, -- comma separated list of allowed groups
"allow_types" varchar, -- comma separated list of allowed file types
"deny_types" varchar, -- comma separated list of denied file types
"capacity" bigint,
"available" bigint,
"fs_available" bigint,
"reserved" bigint,
"used" bigint,
"last_heartbeat" timestamp(6),
"heartbeat_err" varchar
);

View File

@ -100,7 +100,7 @@ type Tx struct {
// BeginTransaction is how you can access transactions using this library.
// The entire transaction happens in the function passed in.
// The return must be true or a rollback will occur.
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (didCommit bool, retErr error) {
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error)) (didCommit bool, retErr error) {
tx, err := db.pgx.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return false, err
@ -111,7 +111,10 @@ func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool)) (
retErr = tx.Rollback(ctx)
}
}()
commit = f(&Tx{tx, ctx})
commit, err = f(&Tx{tx, ctx})
if err != nil {
return false, err
}
if commit {
err := tx.Commit(ctx)
if err != nil {

View File

@ -126,10 +126,14 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(sectorblocks.SectorBuilder), From(new(*sealing.Sealing))),
),
Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) {
return harmonydb.NewFromConfigWithITestID(cfg)(id)
}),
If(cfg.Subsystems.EnableSectorStorage,
// Sector storage
Override(new(*paths.Index), paths.NewIndex),
Override(new(paths.SectorIndex), From(new(*paths.Index))),
Override(new(*paths.IndexProxy), paths.NewIndexProxyHelper(cfg.Subsystems.EnableSectorIndexDB)),
Override(new(paths.SectorIndex), From(new(*paths.IndexProxy))),
Override(new(*sectorstorage.Manager), modules.SectorStorage),
Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))),
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
@ -234,9 +238,6 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(config.HarmonyDB), cfg.HarmonyDB),
Override(new(harmonydb.ITestID), harmonydb.ITestID("")),
Override(new(*ctladdr.AddressSelector), modules.AddressSelector(&cfg.Addresses)),
Override(new(*harmonydb.DB), func(cfg config.HarmonyDB, id harmonydb.ITestID) (*harmonydb.DB, error) {
return harmonydb.NewFromConfigWithITestID(cfg)(id)
}),
)
}

View File

@ -235,6 +235,7 @@ func DefaultStorageMiner() *StorageMiner {
EnableSealing: true,
EnableSectorStorage: true,
EnableMarkets: false,
EnableSectorIndexDB: false,
},
Fees: MinerFeeConfig{

View File

@ -71,6 +71,8 @@ func TestDefaultMinerRoundtrip(t *testing.T) {
fmt.Println(s)
fmt.Println(c)
fmt.Println(c2)
require.True(t, reflect.DeepEqual(c, c2))
}

View File

@ -747,6 +747,14 @@ over the worker address if this flag is set.`,
Comment: ``,
},
{
Name: "EnableSectorIndexDB",
Type: "bool",
Comment: `When enabled, the sector index will reside in an external database
as opposed to the local KV store in the miner process
This is useful to allow workers to bypass the lotus miner to access sector information`,
},
{
Name: "SealerApiInfo",
Type: "string",

View File

@ -110,6 +110,11 @@ type MinerSubsystemConfig struct {
EnableSectorStorage bool
EnableMarkets bool
// When enabled, the sector index will reside in an external database
// as opposed to the local KV store in the miner process
// This is useful to allow workers to bypass the lotus miner to access sector information
EnableSectorIndexDB bool
SealerApiInfo string // if EnableSealing == false
SectorIndexApiInfo string // if EnableSectorStorage == false
}

1001
storage/paths/db_index.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -215,6 +215,7 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st
lastHeartbeat: time.Now(),
}
return nil
}

View File

@ -0,0 +1,118 @@
package paths
import (
"context"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
type IndexProxy struct {
memIndex *Index
dbIndex *DBIndex
enableSectorIndexDB bool
}
func NewIndexProxyHelper(enableSectorIndexDB bool) func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy {
return func(al *alerting.Alerting, db *harmonydb.DB) *IndexProxy {
return NewIndexProxy(al, db, enableSectorIndexDB)
}
}
func NewIndexProxy(al *alerting.Alerting, db *harmonydb.DB, enableSectorIndexDB bool) *IndexProxy {
return &IndexProxy{
memIndex: NewIndex(al),
dbIndex: NewDBIndex(al, db),
enableSectorIndexDB: enableSectorIndexDB,
}
}
func (ip *IndexProxy) StorageAttach(ctx context.Context, info storiface.StorageInfo, stat fsutil.FsStat) error {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageAttach(ctx, info, stat)
}
return ip.memIndex.StorageAttach(ctx, info, stat)
}
func (ip *IndexProxy) StorageDetach(ctx context.Context, id storiface.ID, url string) error {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageDetach(ctx, id, url)
}
return ip.memIndex.StorageDetach(ctx, id, url)
}
func (ip *IndexProxy) StorageInfo(ctx context.Context, id storiface.ID) (storiface.StorageInfo, error) {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageInfo(ctx, id)
}
return ip.memIndex.StorageInfo(ctx, id)
}
func (ip *IndexProxy) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageReportHealth(ctx, id, report)
}
return ip.memIndex.StorageReportHealth(ctx, id, report)
}
func (ip *IndexProxy) StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageDeclareSector(ctx, storageID, s, ft, primary)
}
return ip.memIndex.StorageDeclareSector(ctx, storageID, s, ft, primary)
}
func (ip *IndexProxy) StorageDropSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType) error {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageDropSector(ctx, storageID, s, ft)
}
return ip.memIndex.StorageDropSector(ctx, storageID, s, ft)
}
func (ip *IndexProxy) StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, ssize abi.SectorSize, allowFetch bool) ([]storiface.SectorStorageInfo, error) {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch)
}
return ip.memIndex.StorageFindSector(ctx, sector, ft, ssize, allowFetch)
}
func (ip *IndexProxy) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, ssize abi.SectorSize, pathType storiface.PathType) ([]storiface.StorageInfo, error) {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageBestAlloc(ctx, allocate, ssize, pathType)
}
return ip.memIndex.StorageBestAlloc(ctx, allocate, ssize, pathType)
}
func (ip *IndexProxy) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageLock(ctx, sector, read, write)
}
return ip.memIndex.StorageLock(ctx, sector, read, write)
}
func (ip *IndexProxy) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageTryLock(ctx, sector, read, write)
}
return ip.memIndex.StorageTryLock(ctx, sector, read, write)
}
func (ip *IndexProxy) StorageGetLocks(ctx context.Context) (storiface.SectorLocks, error) {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageGetLocks(ctx)
}
return ip.memIndex.StorageGetLocks(ctx)
}
func (ip *IndexProxy) StorageList(ctx context.Context) (map[storiface.ID][]storiface.Decl, error) {
if ip.enableSectorIndexDB {
return ip.dbIndex.StorageList(ctx)
}
return ip.memIndex.StorageList(ctx)
}
var _ SectorIndex = &IndexProxy{}

View File

@ -16,10 +16,6 @@ const (
AcquireCopy AcquireMode = "copy"
)
type Refs struct {
RefCount [FileTypes]uint
}
type SectorLock struct {
Sector abi.SectorID
Write [FileTypes]uint