workers: Simple storage diagnostics
This commit is contained in:
parent
416a0d2722
commit
7e997e40f3
@ -108,6 +108,10 @@ type StorageMiner interface {
|
|||||||
|
|
||||||
SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error
|
SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error
|
||||||
|
|
||||||
|
StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error)
|
||||||
|
|
||||||
|
StorageInfo(context.Context, stores.ID) (stores.StorageInfo, error)
|
||||||
|
|
||||||
// WorkerConnect tells the node to connect to workers RPC
|
// WorkerConnect tells the node to connect to workers RPC
|
||||||
WorkerConnect(context.Context, string) error
|
WorkerConnect(context.Context, string) error
|
||||||
stores.SectorIndex
|
stores.SectorIndex
|
||||||
|
@ -185,6 +185,8 @@ type StorageMinerStruct struct {
|
|||||||
WorkerAttachStorage func(context.Context, stores.StorageInfo) error `perm:"admin"`
|
WorkerAttachStorage func(context.Context, stores.StorageInfo) error `perm:"admin"`
|
||||||
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"`
|
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, sectorbuilder.SectorFileType) error `perm:"admin"`
|
||||||
StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) `perm:"admin"`
|
StorageFindSector func(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]stores.StorageInfo, error) `perm:"admin"`
|
||||||
|
StorageList func(ctx context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
|
||||||
|
StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"`
|
||||||
|
|
||||||
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
|
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
|
||||||
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
|
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
|
||||||
@ -664,6 +666,14 @@ func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.Secto
|
|||||||
return c.Internal.StorageFindSector(ctx, si, types)
|
return c.Internal.StorageFindSector(ctx, si, types)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *StorageMinerStruct) StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) {
|
||||||
|
return c.Internal.StorageList(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StorageMinerStruct) StorageInfo(ctx context.Context, id stores.ID) (stores.StorageInfo, error) {
|
||||||
|
return c.Internal.StorageInfo(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *StorageMinerStruct) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error {
|
func (c *StorageMinerStruct) MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error {
|
||||||
return c.Internal.MarketImportDealData(ctx, propcid, path)
|
return c.Internal.MarketImportDealData(ctx, propcid, path)
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
@ -13,8 +14,6 @@ import (
|
|||||||
"gopkg.in/urfave/cli.v2"
|
"gopkg.in/urfave/cli.v2"
|
||||||
|
|
||||||
paramfetch "github.com/filecoin-project/go-paramfetch"
|
paramfetch "github.com/filecoin-project/go-paramfetch"
|
||||||
manet "github.com/multiformats/go-multiaddr-net"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/api/apistruct"
|
"github.com/filecoin-project/lotus/api/apistruct"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
@ -31,11 +30,6 @@ var log = logging.Logger("main")
|
|||||||
|
|
||||||
const FlagStorageRepo = "workerrepo"
|
const FlagStorageRepo = "workerrepo"
|
||||||
|
|
||||||
const (
|
|
||||||
workers = 1 // TODO: Configurability
|
|
||||||
transfers = 1
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
lotuslog.SetupLogLevels()
|
lotuslog.SetupLogLevels()
|
||||||
|
|
||||||
@ -81,11 +75,23 @@ func main() {
|
|||||||
var runCmd = &cli.Command{
|
var runCmd = &cli.Command{
|
||||||
Name: "run",
|
Name: "run",
|
||||||
Usage: "Start lotus worker",
|
Usage: "Start lotus worker",
|
||||||
|
Flags: []cli.Flag{
|
||||||
|
&cli.StringFlag{
|
||||||
|
Name: "address",
|
||||||
|
Usage: "Locally reachable address",
|
||||||
|
},
|
||||||
|
},
|
||||||
Action: func(cctx *cli.Context) error {
|
Action: func(cctx *cli.Context) error {
|
||||||
if !cctx.Bool("enable-gpu-proving") {
|
if !cctx.Bool("enable-gpu-proving") {
|
||||||
os.Setenv("BELLMAN_NO_GPU", "true")
|
os.Setenv("BELLMAN_NO_GPU", "true")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cctx.String("address") == "" {
|
||||||
|
return xerrors.Errorf("--address flag is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect to storage-miner
|
||||||
|
|
||||||
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
|
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting miner api: %w", err)
|
return xerrors.Errorf("getting miner api: %w", err)
|
||||||
@ -107,6 +113,8 @@ var runCmd = &cli.Command{
|
|||||||
log.Warn("Shutting down..")
|
log.Warn("Shutting down..")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// Check params
|
||||||
|
|
||||||
act, err := nodeApi.ActorAddress(ctx)
|
act, err := nodeApi.ActorAddress(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -120,6 +128,8 @@ var runCmd = &cli.Command{
|
|||||||
return xerrors.Errorf("get params: %w", err)
|
return xerrors.Errorf("get params: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Open repo
|
||||||
|
|
||||||
repoPath := cctx.String(FlagStorageRepo)
|
repoPath := cctx.String(FlagStorageRepo)
|
||||||
r, err := repo.NewFS(repoPath)
|
r, err := repo.NewFS(repoPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -144,16 +154,15 @@ var runCmd = &cli.Command{
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
endpoint, err := r.APIEndpoint()
|
if err := stores.DeclareLocalStorage(
|
||||||
if err != nil {
|
ctx,
|
||||||
|
nodeApi,
|
||||||
|
localStore,
|
||||||
|
[]string{"http://" + cctx.String("address") + "/remote"}, // TODO: Less hardcoded
|
||||||
|
1); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Setup remote sector store
|
||||||
lst, err := manet.Listen(endpoint)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("could not listen: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, spt, err := api.ProofTypeFromSectorSize(ssize)
|
_, spt, err := api.ProofTypeFromSectorSize(ssize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting proof type: %w", err)
|
return xerrors.Errorf("getting proof type: %w", err)
|
||||||
@ -166,6 +175,8 @@ var runCmd = &cli.Command{
|
|||||||
|
|
||||||
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader())
|
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader())
|
||||||
|
|
||||||
|
// Create / expose the worker
|
||||||
|
|
||||||
workerApi := &worker{
|
workerApi := &worker{
|
||||||
LocalWorker: advmgr.NewLocalWorker(act, spt, remote, localStore, stores.NewIndex()),
|
LocalWorker: advmgr.NewLocalWorker(act, spt, remote, localStore, stores.NewIndex()),
|
||||||
}
|
}
|
||||||
@ -195,8 +206,14 @@ var runCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
log.Warn("Graceful shutdown successful")
|
log.Warn("Graceful shutdown successful")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
|
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
|
||||||
|
|
||||||
return srv.Serve(manet.NetListener(lst))
|
nl, err := net.Listen("tcp4", cctx.String("address"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return srv.Serve(nl)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -392,11 +392,11 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
smgr, err := advmgr.New(lr, §orbuilder.Config{
|
smgr, err := advmgr.New(lr, stores.NewIndex(), §orbuilder.Config{
|
||||||
SealProofType: spt,
|
SealProofType: spt,
|
||||||
PoStProofType: ppt,
|
PoStProofType: ppt,
|
||||||
Miner: a,
|
Miner: a,
|
||||||
}, nil)
|
}, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||||
"github.com/filecoin-project/lotus/node"
|
"github.com/filecoin-project/lotus/node"
|
||||||
"github.com/filecoin-project/lotus/node/impl"
|
"github.com/filecoin-project/lotus/node/impl"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -29,10 +30,6 @@ var runCmd = &cli.Command{
|
|||||||
Name: "run",
|
Name: "run",
|
||||||
Usage: "Start a lotus storage miner process",
|
Usage: "Start a lotus storage miner process",
|
||||||
Flags: []cli.Flag{
|
Flags: []cli.Flag{
|
||||||
&cli.StringFlag{
|
|
||||||
Name: "api",
|
|
||||||
Value: "2345",
|
|
||||||
},
|
|
||||||
&cli.BoolFlag{
|
&cli.BoolFlag{
|
||||||
Name: "enable-gpu-proving",
|
Name: "enable-gpu-proving",
|
||||||
Usage: "enable use of GPU for mining operations",
|
Usage: "enable use of GPU for mining operations",
|
||||||
@ -93,13 +90,8 @@ var runCmd = &cli.Command{
|
|||||||
node.Repo(r),
|
node.Repo(r),
|
||||||
|
|
||||||
node.ApplyIf(func(s *node.Settings) bool { return cctx.IsSet("api") },
|
node.ApplyIf(func(s *node.Settings) bool { return cctx.IsSet("api") },
|
||||||
node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error {
|
node.Override(new(dtypes.APIEndpoint), func() (dtypes.APIEndpoint, error) {
|
||||||
apima, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/" +
|
return multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/" + cctx.String("api"))
|
||||||
cctx.String("api"))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return lr.SetAPIEndpoint(apima)
|
|
||||||
})),
|
})),
|
||||||
node.Override(new(api.FullNode), nodeApi),
|
node.Override(new(api.FullNode), nodeApi),
|
||||||
)
|
)
|
||||||
|
@ -2,7 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -12,7 +12,10 @@ import (
|
|||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
"gopkg.in/urfave/cli.v2"
|
"gopkg.in/urfave/cli.v2"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-sectorbuilder"
|
||||||
|
|
||||||
lcli "github.com/filecoin-project/lotus/cli"
|
lcli "github.com/filecoin-project/lotus/cli"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealmgr/stores"
|
||||||
)
|
)
|
||||||
|
|
||||||
const metaFile = "sectorstore.json"
|
const metaFile = "sectorstore.json"
|
||||||
@ -22,6 +25,7 @@ var storageCmd = &cli.Command{
|
|||||||
Usage: "manage sector storage",
|
Usage: "manage sector storage",
|
||||||
Subcommands: []*cli.Command{
|
Subcommands: []*cli.Command{
|
||||||
storageAttachCmd,
|
storageAttachCmd,
|
||||||
|
storageListCmd,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,3 +107,49 @@ var storageAttachCmd = &cli.Command{
|
|||||||
return nodeApi.StorageAddLocal(ctx, p)
|
return nodeApi.StorageAddLocal(ctx, p)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var storageListCmd = &cli.Command{
|
||||||
|
Name: "list",
|
||||||
|
Action: func(cctx *cli.Context) error {
|
||||||
|
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer closer()
|
||||||
|
ctx := lcli.ReqContext(cctx)
|
||||||
|
|
||||||
|
st, err := nodeApi.StorageList(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for id, sectors := range st {
|
||||||
|
var u, s, c int
|
||||||
|
for _, decl := range sectors {
|
||||||
|
if decl.SectorFileType§orbuilder.FTUnsealed > 0 {
|
||||||
|
u++
|
||||||
|
}
|
||||||
|
if decl.SectorFileType§orbuilder.FTSealed > 0 {
|
||||||
|
s++
|
||||||
|
}
|
||||||
|
if decl.SectorFileType§orbuilder.FTCache > 0 {
|
||||||
|
c++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("%s:\n", id)
|
||||||
|
fmt.Printf("\tUnsealed: %d; Sealed: %d; Caches: %d\n", u, s, c)
|
||||||
|
|
||||||
|
si, err := nodeApi.StorageInfo(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Printf("\tSeal: %t; Store: %t; Cost: %d\n", si.CanSeal, si.CanStore, si.Cost)
|
||||||
|
for _, l := range si.URLs {
|
||||||
|
fmt.Printf("\tReachable %s\n", l) // TODO; try pinging maybe?? print latency?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
1
go.mod
1
go.mod
@ -101,6 +101,7 @@ require (
|
|||||||
go.uber.org/multierr v1.4.0
|
go.uber.org/multierr v1.4.0
|
||||||
go.uber.org/zap v1.13.0
|
go.uber.org/zap v1.13.0
|
||||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
|
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
|
||||||
|
golang.org/x/tools v0.0.0-20191216173652-a0e659d51361
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
|
||||||
gopkg.in/urfave/cli.v2 v2.0.0-20180128182452-d3ae77c26ac8
|
gopkg.in/urfave/cli.v2 v2.0.0-20180128182452-d3ae77c26ac8
|
||||||
gotest.tools v2.2.0+incompatible
|
gotest.tools v2.2.0+incompatible
|
||||||
|
2
go.sum
2
go.sum
@ -96,7 +96,6 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn
|
|||||||
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
|
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
|
||||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||||
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
|
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
|
||||||
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
|
|
||||||
github.com/fatih/color v1.8.0 h1:5bzFgL+oy7JITMTxUPJ00n7VxmYd/PdMp5mHFX40/RY=
|
github.com/fatih/color v1.8.0 h1:5bzFgL+oy7JITMTxUPJ00n7VxmYd/PdMp5mHFX40/RY=
|
||||||
github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGjnw8=
|
github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGjnw8=
|
||||||
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
|
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
|
||||||
@ -122,7 +121,6 @@ github.com/filecoin-project/go-fil-markets v0.0.0-20200304003055-d449a980d4bd h1
|
|||||||
github.com/filecoin-project/go-fil-markets v0.0.0-20200304003055-d449a980d4bd/go.mod h1:rfRwhd3ujcCXnD4N9oEM2wjh8GRZGoeNXME+UPG/9ts=
|
github.com/filecoin-project/go-fil-markets v0.0.0-20200304003055-d449a980d4bd/go.mod h1:rfRwhd3ujcCXnD4N9oEM2wjh8GRZGoeNXME+UPG/9ts=
|
||||||
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
|
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
|
||||||
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE=
|
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6/go.mod h1:0HgYnrkeSU4lu1p+LEOeDpFsNBssa0OGGriWdA4hvaE=
|
||||||
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
|
|
||||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 h1:eYxi6vI5CyeXD15X1bB3bledDXbqKxqf0wQzTLgwYwA=
|
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663 h1:eYxi6vI5CyeXD15X1bB3bledDXbqKxqf0wQzTLgwYwA=
|
||||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
|
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200218225740-47c639bab663/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
|
||||||
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200226210935-4739f8749f56/go.mod h1:tzTc9BxxSbjlIzhFwm5h9oBkXKkRuLxeiWspntwnKyw=
|
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200226210935-4739f8749f56/go.mod h1:tzTc9BxxSbjlIzhFwm5h9oBkXKkRuLxeiWspntwnKyw=
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
record "github.com/libp2p/go-libp2p-record"
|
record "github.com/libp2p/go-libp2p-record"
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
manet "github.com/multiformats/go-multiaddr-net"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -318,15 +319,22 @@ func StorageMiner(out *api.StorageMiner) Option {
|
|||||||
func ConfigCommon(cfg *config.Common) Option {
|
func ConfigCommon(cfg *config.Common) Option {
|
||||||
return Options(
|
return Options(
|
||||||
func(s *Settings) error { s.Config = true; return nil },
|
func(s *Settings) error { s.Config = true; return nil },
|
||||||
|
Override(new(dtypes.APIEndpoint), func() (dtypes.APIEndpoint, error) {
|
||||||
Override(SetApiEndpointKey, func(lr repo.LockedRepo) error {
|
return multiaddr.NewMultiaddr(cfg.API.ListenAddress)
|
||||||
apima, err := multiaddr.NewMultiaddr(cfg.API.ListenAddress)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return lr.SetAPIEndpoint(apima)
|
|
||||||
}),
|
}),
|
||||||
|
Override(SetApiEndpointKey, func(lr repo.LockedRepo, e dtypes.APIEndpoint) error {
|
||||||
|
return lr.SetAPIEndpoint(e)
|
||||||
|
}),
|
||||||
|
Override(new(advmgr.URLs), func(e dtypes.APIEndpoint) (advmgr.URLs, error) {
|
||||||
|
_, ip, err := manet.DialArgs(e)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting api endpoint dial args: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var urls advmgr.URLs
|
||||||
|
urls = append(urls, "http://"+ip+"/remote") // TODO: This makes assumptions, and probably bad ones too
|
||||||
|
return urls, nil
|
||||||
|
}),
|
||||||
ApplyIf(func(s *Settings) bool { return s.Online },
|
ApplyIf(func(s *Settings) bool { return s.Online },
|
||||||
Override(StartListeningKey, lp2p.StartListening(cfg.Libp2p.ListenAddresses)),
|
Override(StartListeningKey, lp2p.StartListening(cfg.Libp2p.ListenAddresses)),
|
||||||
Override(ConnectionManagerKey, lp2p.ConnectionManager(
|
Override(ConnectionManagerKey, lp2p.ConnectionManager(
|
||||||
|
@ -1,5 +1,10 @@
|
|||||||
package dtypes
|
package dtypes
|
||||||
|
|
||||||
import "github.com/gbrlsnchs/jwt/v3"
|
import (
|
||||||
|
"github.com/gbrlsnchs/jwt/v3"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
)
|
||||||
|
|
||||||
type APIAlg jwt.HMACSHA
|
type APIAlg jwt.HMACSHA
|
||||||
|
|
||||||
|
type APIEndpoint multiaddr.Multiaddr
|
||||||
|
@ -24,6 +24,8 @@ import (
|
|||||||
|
|
||||||
var log = logging.Logger("advmgr")
|
var log = logging.Logger("advmgr")
|
||||||
|
|
||||||
|
type URLs []string
|
||||||
|
|
||||||
type SectorIDCounter interface {
|
type SectorIDCounter interface {
|
||||||
Next() (abi.SectorNumber, error)
|
Next() (abi.SectorNumber, error)
|
||||||
}
|
}
|
||||||
@ -47,12 +49,16 @@ type Manager struct {
|
|||||||
storage2.Prover
|
storage2.Prover
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(ls stores.LocalStorage, cfg *sectorbuilder.Config, sc SectorIDCounter) (*Manager, error) {
|
func New(ls stores.LocalStorage, si *stores.Index, cfg *sectorbuilder.Config, sc SectorIDCounter, urls URLs) (*Manager, error) {
|
||||||
stor, err := stores.NewLocal(ls)
|
stor, err := stores.NewLocal(ls)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := stores.DeclareLocalStorage(context.TODO(), si, stor, urls, 10); err != nil {
|
||||||
|
log.Errorf("Declaring local storage failed: %+v")
|
||||||
|
}
|
||||||
|
|
||||||
mid, err := address.IDFromAddress(cfg.Miner)
|
mid, err := address.IDFromAddress(cfg.Miner)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("getting miner id: %w", err)
|
return nil, xerrors.Errorf("getting miner id: %w", err)
|
||||||
|
@ -34,7 +34,7 @@ type SectorIndex interface { // part of storage-miner api
|
|||||||
StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error)
|
StorageFindSector(context.Context, abi.SectorID, sectorbuilder.SectorFileType) ([]StorageInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type decl struct {
|
type Decl struct {
|
||||||
abi.SectorID
|
abi.SectorID
|
||||||
sectorbuilder.SectorFileType
|
sectorbuilder.SectorFileType
|
||||||
}
|
}
|
||||||
@ -42,17 +42,42 @@ type decl struct {
|
|||||||
type Index struct {
|
type Index struct {
|
||||||
lk sync.Mutex
|
lk sync.Mutex
|
||||||
|
|
||||||
sectors map[decl][]ID
|
sectors map[Decl][]ID
|
||||||
stores map[ID]*StorageInfo
|
stores map[ID]*StorageInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewIndex() *Index {
|
func NewIndex() *Index {
|
||||||
return &Index{
|
return &Index{
|
||||||
sectors: map[decl][]ID{},
|
sectors: map[Decl][]ID{},
|
||||||
stores: map[ID]*StorageInfo{},
|
stores: map[ID]*StorageInfo{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) {
|
||||||
|
byID := map[ID]map[abi.SectorID]sectorbuilder.SectorFileType{}
|
||||||
|
|
||||||
|
for id := range i.stores {
|
||||||
|
byID[id] = map[abi.SectorID]sectorbuilder.SectorFileType{}
|
||||||
|
}
|
||||||
|
for decl, ids := range i.sectors {
|
||||||
|
for _, id := range ids {
|
||||||
|
byID[id][decl.SectorID] |= decl.SectorFileType
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out := map[ID][]Decl{}
|
||||||
|
for id, m := range byID {
|
||||||
|
for sectorID, fileType := range m {
|
||||||
|
out[id] = append(out[id], Decl{
|
||||||
|
SectorID: sectorID,
|
||||||
|
SectorFileType: fileType,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (i *Index) StorageAttach(ctx context.Context, si StorageInfo) error {
|
func (i *Index) StorageAttach(ctx context.Context, si StorageInfo) error {
|
||||||
i.lk.Lock()
|
i.lk.Lock()
|
||||||
defer i.lk.Unlock()
|
defer i.lk.Unlock()
|
||||||
@ -80,7 +105,7 @@ func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.Se
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
d := decl{s, fileType}
|
d := Decl{s, fileType}
|
||||||
|
|
||||||
for _, sid := range i.sectors[d] {
|
for _, sid := range i.sectors[d] {
|
||||||
if sid == storageId {
|
if sid == storageId {
|
||||||
@ -99,7 +124,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector
|
|||||||
i.lk.Lock()
|
i.lk.Lock()
|
||||||
defer i.lk.Unlock()
|
defer i.lk.Unlock()
|
||||||
|
|
||||||
storageIDs := i.sectors[decl{s, ft}]
|
storageIDs := i.sectors[Decl{s, ft}]
|
||||||
out := make([]StorageInfo, len(storageIDs))
|
out := make([]StorageInfo, len(storageIDs))
|
||||||
|
|
||||||
for j, id := range storageIDs {
|
for j, id := range storageIDs {
|
||||||
@ -132,4 +157,37 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft sector
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) {
|
||||||
|
si, found := i.stores[id]
|
||||||
|
if !found {
|
||||||
|
return StorageInfo{}, xerrors.Errorf("sector store not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
return *si, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func DeclareLocalStorage(ctx context.Context, idx SectorIndex, localStore *Local, urls []string, cost int) error {
|
||||||
|
for _, path := range localStore.Local() {
|
||||||
|
err := idx.StorageAttach(ctx, StorageInfo{
|
||||||
|
ID: path.ID,
|
||||||
|
URLs: urls,
|
||||||
|
Cost: cost,
|
||||||
|
CanSeal: path.CanSeal,
|
||||||
|
CanStore: path.CanStore,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("attaching local storage to remote: %+v")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for id, fileType := range localStore.List(path.ID) {
|
||||||
|
if err := idx.StorageDeclareSector(ctx, path.ID, id, fileType); err != nil {
|
||||||
|
log.Errorf("declaring sector: %+v")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var _ SectorIndex = &Index{}
|
var _ SectorIndex = &Index{}
|
||||||
|
@ -274,3 +274,17 @@ func (st *Local) Local() []StoragePath {
|
|||||||
|
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (st *Local) List(id ID) map[abi.SectorID]sectorbuilder.SectorFileType {
|
||||||
|
out := map[abi.SectorID]sectorbuilder.SectorFileType{}
|
||||||
|
for _, p := range st.paths {
|
||||||
|
if p.meta.ID != id { // TODO: not very efficient
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for id, fileType := range p.sectors {
|
||||||
|
out[id] |= fileType
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user