diff --git a/api/api.go b/api/api.go index 5d46ef5fa..d5ba98f64 100644 --- a/api/api.go +++ b/api/api.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "github.com/ipfs/go-cid" "github.com/ipfs/go-filestore" ) @@ -115,4 +116,16 @@ type FullNode interface { // Full API is a low-level interface to the Filecoin network storage miner node type StorageMiner interface { Common + + // Temp api for testing + StoreGarbageData(context.Context) (uint64, error) + + // Get the status of a given sector by ID + SectorsStatus(context.Context, uint64) (sectorbuilder.SectorSealingStatus, error) + + // List all staged sectors + SectorsStagedList(context.Context) ([]sectorbuilder.StagedSectorMetadata, error) + + // Seal all staged sectors + SectorsStagedSeal(context.Context) error } diff --git a/api/struct.go b/api/struct.go index 430619d44..82379f278 100644 --- a/api/struct.go +++ b/api/struct.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" @@ -68,6 +69,11 @@ type StorageMinerStruct struct { CommonStruct Internal struct { + StoreGarbageData func(context.Context) (uint64, error) `perm:"write"` + + SectorsStatus func(context.Context, uint64) (sectorbuilder.SectorSealingStatus, error) `perm:"read"` + SectorsStagedList func(context.Context) ([]sectorbuilder.StagedSectorMetadata, error) `perm:"read"` + SectorsStagedSeal func(context.Context) error `perm:"write"` } } @@ -185,6 +191,25 @@ func (c *FullNodeStruct) ChainNotify(ctx context.Context) (<-chan *store.HeadCha return c.Internal.ChainNotify(ctx) } +func (c *StorageMinerStruct) StoreGarbageData(ctx context.Context) (uint64, error) { + return c.Internal.StoreGarbageData(ctx) +} + +// Get the status of a given sector by ID +func (c *StorageMinerStruct) SectorsStatus(ctx context.Context, sid uint64) (sectorbuilder.SectorSealingStatus, error) { + return c.Internal.SectorsStatus(ctx, sid) +} + +// List all staged sectors +func (c *StorageMinerStruct) SectorsStagedList(ctx context.Context) ([]sectorbuilder.StagedSectorMetadata, error) { + return c.Internal.SectorsStagedList(ctx) +} + +// Seal all staged sectors +func (c *StorageMinerStruct) SectorsStagedSeal(ctx context.Context) error { + return c.Internal.SectorsStagedSeal(ctx) +} + var _ Common = &CommonStruct{} var _ FullNode = &FullNodeStruct{} var _ StorageMiner = &StorageMinerStruct{} diff --git a/cli/cmd.go b/cli/cmd.go index 62775e635..9d444f533 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -27,19 +27,19 @@ const ( // ApiConnector returns API instance type ApiConnector func() api.FullNode -func GetAPI(ctx *cli.Context) (api.FullNode, error) { - r, err := repo.NewFS(ctx.String("repo")) +func getAPI(ctx *cli.Context, repoFlag string) (string, http.Header, error) { + r, err := repo.NewFS(ctx.String(repoFlag)) if err != nil { - return nil, err + return "", nil, err } ma, err := r.APIEndpoint() if err != nil { - return nil, xerrors.Errorf("failed to get api endpoint: %w", err) + return "", nil, xerrors.Errorf("failed to get api endpoint: %w", err) } _, addr, err := manet.DialArgs(ma) if err != nil { - return nil, err + return "", nil, err } var headers http.Header token, err := r.APIToken() @@ -50,7 +50,25 @@ func GetAPI(ctx *cli.Context) (api.FullNode, error) { headers.Add("Authorization", "Bearer "+string(token)) } - return client.NewFullNodeRPC("ws://"+addr+"/rpc/v0", headers) + return "ws://" + addr + "/rpc/v0", headers, nil +} + +func GetAPI(ctx *cli.Context) (api.FullNode, error) { + addr, headers, err := getAPI(ctx, "repo") + if err != nil { + return nil, err + } + + return client.NewFullNodeRPC(addr, headers) +} + +func GetStorageMinerAPI(ctx *cli.Context) (api.StorageMiner, error) { + addr, headers, err := getAPI(ctx, "storagerepo") + if err != nil { + return nil, err + } + + return client.NewStorageMinerRPC(addr, headers) } // ReqContext returns context for cli execution. Calling it for the first time diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index c625f4620..0b15bb611 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -154,7 +154,7 @@ var initCmd = &cli.Command{ return err } - log.Infof("Waiting for confirmation") + log.Infof("Waiting for confirmation (TODO: actually wait)") mw, err := api.ChainWaitMsg(ctx, signed.Cid()) if err != nil { diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go index 390b50a37..d91745783 100644 --- a/cmd/lotus-storage-miner/main.go +++ b/cmd/lotus-storage-miner/main.go @@ -21,6 +21,8 @@ func main() { local := []*cli.Command{ runCmd, initCmd, + storeGarbageCmd, + sectorsCmd, } jaeger := tracing.SetupJaegerTracing("lotus") defer func() { diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 42a26eb23..80df5015b 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -2,6 +2,7 @@ package main import ( "net/http" + "os" "github.com/multiformats/go-multiaddr" "golang.org/x/xerrors" @@ -11,7 +12,9 @@ import ( lcli "github.com/filecoin-project/go-lotus/cli" "github.com/filecoin-project/go-lotus/lib/auth" "github.com/filecoin-project/go-lotus/lib/jsonrpc" + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/node" + "github.com/filecoin-project/go-lotus/node/modules" "github.com/filecoin-project/go-lotus/node/repo" ) @@ -31,12 +34,20 @@ var runCmd = &cli.Command{ } ctx := lcli.ReqContext(cctx) + go func() { + // a hack for now to handle sigint + + <-ctx.Done() + os.Exit(0) + }() + v, err := nodeApi.Version(ctx) if err != nil { return err } - r, err := repo.NewFS(cctx.String(FlagStorageRepo)) + storageRepoPath := cctx.String(FlagStorageRepo) + r, err := repo.NewFS(storageRepoPath) if err != nil { return err } @@ -46,7 +57,7 @@ var runCmd = &cli.Command{ return err } if !ok { - return xerrors.Errorf("repo at '%s' is not initialized, run 'lotus-storage-miner init' to set it up", cctx.String(FlagStorageRepo)) + return xerrors.Errorf("repo at '%s' is not initialized, run 'lotus-storage-miner init' to set it up", storageRepoPath) } var minerapi api.StorageMiner @@ -62,6 +73,7 @@ var runCmd = &cli.Command{ } return lr.SetAPIEndpoint(apima) }), + node.Override(new(*sectorbuilder.SectorBuilderConfig), modules.SectorBuilderConfig(storageRepoPath)), ) if err != nil { return err diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go new file mode 100644 index 000000000..3fd5aab07 --- /dev/null +++ b/cmd/lotus-storage-miner/sectors.go @@ -0,0 +1,112 @@ +package main + +import ( + "fmt" + "strconv" + + "gopkg.in/urfave/cli.v2" + + lcli "github.com/filecoin-project/go-lotus/cli" +) + +var storeGarbageCmd = &cli.Command{ + Name: "store-garbage", + Usage: "store random data in a sector", + Action: func(cctx *cli.Context) error { + nodeApi, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + ctx := lcli.ReqContext(cctx) + + sectorId, err := nodeApi.StoreGarbageData(ctx) + if err != nil { + return err + } + + fmt.Println(sectorId) + return nil + }, +} + +var sectorsCmd = &cli.Command{ + Name: "sectors", + Usage: "interact with sector store", + Subcommands: []*cli.Command{ + sectorsStatusCmd, + sectorsStagedListCmd, + sectorsStagedSealCmd, + }, +} + +var sectorsStatusCmd = &cli.Command{ + Name: "status", + Usage: "Get the seal status of a sector by its ID", + Action: func(cctx *cli.Context) error { + nodeApi, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + ctx := lcli.ReqContext(cctx) + + if !cctx.Args().Present() { + return fmt.Errorf("must specify sector ID to get status of") + } + + id, err := strconv.ParseUint(cctx.Args().First(), 10, 64) + if err != nil { + return err + } + + status, err := nodeApi.SectorsStatus(ctx, id) + if err != nil { + return err + } + + fmt.Printf("SectorID:\t%d\n", status.SectorID) + fmt.Printf("SealStatusCode:\t%d\n", status.SealStatusCode) + fmt.Printf("SealErrorMsg:\t%q\n", status.SealErrorMsg) + fmt.Printf("CommD:\t\t%x\n", status.CommD) + fmt.Printf("CommR:\t\t%x\n", status.CommR) + fmt.Printf("CommR*:\t\t%x\n", status.CommRStar) + fmt.Printf("Proof:\t\t%x\n", status.Proof) + fmt.Printf("Pieces:\t\t%v\n", status.Pieces) + return nil + }, +} + +var sectorsStagedListCmd = &cli.Command{ + Name: "list-staged", // TODO: nest this under a 'staged' subcommand? idk + Usage: "List staged sectors", + Action: func(cctx *cli.Context) error { + nodeApi, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + ctx := lcli.ReqContext(cctx) + + staged, err := nodeApi.SectorsStagedList(ctx) + if err != nil { + return err + } + + for _, s := range staged { + fmt.Println(s) + } + return nil + }, +} + +var sectorsStagedSealCmd = &cli.Command{ + Name: "seal-staged", // TODO: nest this under a 'staged' subcommand? idk + Usage: "Seal staged sectors", + Action: func(cctx *cli.Context) error { + nodeApi, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + ctx := lcli.ReqContext(cctx) + + return nodeApi.SectorsStagedSeal(ctx) + }, +} diff --git a/go.mod b/go.mod index e806e5c6d..d4ea9604b 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/BurntSushi/toml v0.3.1 github.com/filecoin-project/go-bls-sigs v0.0.0-20190718224239-4bc4b8a7bbf8 github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543 + github.com/filecoin-project/go-sectorbuilder v0.0.0-00010101000000-000000000000 github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1 github.com/gorilla/websocket v1.4.0 github.com/ipfs/go-bitswap v0.1.5 diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go new file mode 100644 index 000000000..20bc8fb27 --- /dev/null +++ b/lib/sectorbuilder/sectorbuilder.go @@ -0,0 +1,83 @@ +package sectorbuilder + +import ( + "unsafe" + + "github.com/filecoin-project/go-lotus/chain/address" + sectorbuilder "github.com/filecoin-project/go-sectorbuilder" +) + +type SectorSealingStatus = sectorbuilder.SectorSealingStatus + +type StagedSectorMetadata = sectorbuilder.StagedSectorMetadata + +const CommLen = sectorbuilder.CommitmentBytesLen + +type SectorBuilder struct { + handle unsafe.Pointer +} + +type SectorBuilderConfig struct { + SectorSize uint64 + Miner address.Address + SealedDir string + StagedDir string + MetadataDir string +} + +func New(cfg *SectorBuilderConfig) (*SectorBuilder, error) { + var proverId [31]byte + copy(proverId[:], cfg.Miner.Payload()) + + sbp, err := sectorbuilder.InitSectorBuilder(cfg.SectorSize, 2, 2, 1, cfg.MetadataDir, [31]byte{}, cfg.SealedDir, cfg.StagedDir, 16) + if err != nil { + return nil, err + } + + return &SectorBuilder{ + handle: sbp, + }, nil +} + +func (sb *SectorBuilder) Destroy() { + sectorbuilder.DestroySectorBuilder(sb.handle) +} + +func (sb *SectorBuilder) AddPiece(pieceKey string, pieceSize uint64, piecePath string) (uint64, error) { + return sectorbuilder.AddPiece(sb.handle, pieceKey, pieceSize, piecePath) +} + +// TODO: should *really really* return an io.ReadCloser +func (sb *SectorBuilder) ReadPieceFromSealedSector(pieceKey string) ([]byte, error) { + return sectorbuilder.ReadPieceFromSealedSector(sb.handle, pieceKey) +} + +func (sb *SectorBuilder) SealAllStagedSectors() error { + return sectorbuilder.SealAllStagedSectors(sb.handle) +} + +func (sb *SectorBuilder) SealStatus(sector uint64) (SectorSealingStatus, error) { + return sectorbuilder.GetSectorSealingStatusByID(sb.handle, sector) +} + +func (sb *SectorBuilder) GetAllStagedSectors() ([]StagedSectorMetadata, error) { + return sectorbuilder.GetAllStagedSectors(sb.handle) +} + +func (sb *SectorBuilder) GeneratePoSt(sortedCommRs [][CommLen]byte, challengeSeed [CommLen]byte) ([][]byte, []uint64, error) { + // Wait, this is a blocking method with no way of interrupting it? + // does it checkpoint itself? + return sectorbuilder.GeneratePoSt(sb.handle, sortedCommRs, challengeSeed) +} + +var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector + +func VerifySeal(sectorSize uint64, commR, commD, commRStar [CommLen]byte, proverID address.Address, sectorID uint64, proof []byte) (bool, error) { + panic("TODO") + // return sectorbuilder.VerifySeal(sectorSize, commR, commD, commRStar, providerID, sectorID, proof) +} + +func VerifyPost(sectorSize uint64, sortedCommRs [][CommLen]byte, challengeSeed [CommLen]byte, proofs [][]byte, faults []uint64) (bool, error) { + // sectorbuilder.VerifyPost() + panic("no") +} diff --git a/node/builder.go b/node/builder.go index da79b06a8..3167d1d5b 100644 --- a/node/builder.go +++ b/node/builder.go @@ -28,6 +28,7 @@ import ( "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/chain/wallet" + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/node/config" "github.com/filecoin-project/go-lotus/node/hello" "github.com/filecoin-project/go-lotus/node/impl" @@ -211,7 +212,9 @@ func Online() Option { ), // Storage miner - + ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner }, + Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New), + ), ) } diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 197ca710a..013d04f04 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -1,11 +1,55 @@ package impl import ( + "context" + "fmt" + "io/ioutil" + "math/rand" + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" ) type StorageMinerAPI struct { CommonAPI + + SectorBuilder *sectorbuilder.SectorBuilder +} + +func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) { + maxSize := uint64(1016) // this is the most data we can fit in a 1024 byte sector + data := make([]byte, maxSize) + fi, err := ioutil.TempFile("", "lotus-garbage") + if err != nil { + return 0, err + } + + if _, err := fi.Write(data); err != nil { + return 0, err + } + fi.Close() + + name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000)) + sectorId, err := sm.SectorBuilder.AddPiece(name, maxSize, fi.Name()) + if err != nil { + return 0, err + } + + return sectorId, err +} + +func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid uint64) (sectorbuilder.SectorSealingStatus, error) { + return sm.SectorBuilder.SealStatus(sid) +} + +// List all staged sectors +func (sm *StorageMinerAPI) SectorsStagedList(context.Context) ([]sectorbuilder.StagedSectorMetadata, error) { + return sm.SectorBuilder.GetAllStagedSectors() +} + +// Seal all staged sectors +func (sm *StorageMinerAPI) SectorsStagedSeal(context.Context) error { + return sm.SectorBuilder.SealAllStagedSectors() } var _ api.StorageMiner = &StorageMinerAPI{} diff --git a/node/modules/core.go b/node/modules/core.go index cf7600ffd..96accfb74 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -26,12 +26,15 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" record "github.com/libp2p/go-libp2p-record" + "github.com/mitchellh/go-homedir" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" "github.com/filecoin-project/go-lotus/node/modules/helpers" "github.com/filecoin-project/go-lotus/node/repo" ) @@ -216,3 +219,30 @@ func LoadGenesis(genBytes []byte) func(blockstore.Blockstore) Genesis { } } } + +func SectorBuilderConfig(storagePath string) func() (*sectorbuilder.SectorBuilderConfig, error) { + return func() (*sectorbuilder.SectorBuilderConfig, error) { + sp, err := homedir.Expand(storagePath) + if err != nil { + return nil, err + } + + metadata := filepath.Join(sp, "meta") + sealed := filepath.Join(sp, "sealed") + staging := filepath.Join(sp, "staging") + + // TODO: get the address of the miner actor + minerAddr, err := address.NewIDAddress(42) + if err != nil { + return nil, err + } + + return §orbuilder.SectorBuilderConfig{ + Miner: minerAddr, + SectorSize: 1024, + MetadataDir: metadata, + SealedDir: sealed, + StagedDir: staging, + }, nil + } +}