diff --git a/.gitignore b/.gitignore index ee13fb270..2bbf35513 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ -lotus +/lotus +/lotus-storage-miner **/*.h **/*.a **/*.pc diff --git a/Makefile b/Makefile index 96f2b49dd..ce6b5512a 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,7 @@ deps: $(BUILD_DEPS) build: $(BUILD_DEPS) go build -o lotus ./cmd/lotus + go build -o lotus-storage-miner ./cmd/lotus-storage-miner .PHONY: build clean: diff --git a/api/api.go b/api/api.go index ee5d3ea6b..4b308520c 100644 --- a/api/api.go +++ b/api/api.go @@ -3,19 +3,26 @@ package api import ( "context" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/types" "github.com/ipfs/go-cid" "github.com/ipfs/go-filestore" - "github.com/libp2p/go-libp2p-core/peer" ) // Version provides various build-time information type Version struct { Version string + // APIVersion is a binary encoded semver version of the remote implementing + // this api + // + // See APIVersion in build/version.go + APIVersion uint32 + // TODO: git commit / os / genesis cid? } @@ -31,14 +38,29 @@ type MsgWait struct { Receipt types.MessageReceipt } -// API is a low-level interface to the Filecoin network -type API interface { +type Common interface { // Auth AuthVerify(ctx context.Context, token string) ([]string, error) AuthNew(ctx context.Context, perms []string) ([]byte, error) - // chain + // network + NetPeers(context.Context) ([]peer.AddrInfo, error) + NetConnect(context.Context, peer.AddrInfo) error + NetAddrsListen(context.Context) (peer.AddrInfo, error) + + // ID returns peerID of libp2p node backing this API + ID(context.Context) (peer.ID, error) + + // Version provides information about API provider + Version(context.Context) (Version, error) +} + +// FullNode API is a low-level interface to the Filecoin network full node +type FullNode interface { + Common + + // chain ChainHead(context.Context) (*chain.TipSet, error) // TODO: check serialization ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error // TODO: check serialization ChainGetRandomness(context.Context, *chain.TipSet) ([]byte, error) @@ -48,35 +70,13 @@ type API interface { // messages - // // wait - // // send - // // status - // // mpool - // // // ls / show / rm MpoolPending(context.Context, *chain.TipSet) ([]*chain.SignedMessage, error) MpoolPush(context.Context, *chain.SignedMessage) error - // dag - - // // get block - // // (cli: show / info) - - // network - - NetPeers(context.Context) ([]peer.AddrInfo, error) - NetConnect(context.Context, peer.AddrInfo) error - NetAddrsListen(context.Context) (peer.AddrInfo, error) - // // ping - - // Struct + // FullNodeStruct // miner - // // create - // // owner - // // power - // // set-price - // // set-perrid MinerStart(context.Context, address.Address) error MinerCreateBlock(context.Context, address.Address, *chain.TipSet, []chain.Ticket, chain.ElectionProof, []*chain.SignedMessage) (*chain.BlockMsg, error) @@ -93,22 +93,6 @@ type API interface { // Really not sure where this belongs. It could go on the wallet, or the message pool, or the chain... MpoolGetNonce(context.Context, address.Address) (uint64, error) - // // import - // // export - // // (on cli - cmd to list associations) - - // dht - - // // need ? - - // paych - - // // todo - - // retrieval - - // // retrieve piece - // Other // ClientImport imports file under the specified path into filestore @@ -121,10 +105,9 @@ type API interface { ClientListImports(ctx context.Context) ([]Import, error) //ClientListAsks() []Ask - - // ID returns peerID of libp2p node backing this API - ID(context.Context) (peer.ID, error) - - // Version provides information about API provider - Version(context.Context) (Version, error) +} + +// Full API is a low-level interface to the Filecoin network storage miner node +type StorageMiner interface { + Common } diff --git a/api/client/client.go b/api/client/client.go index 6a412956d..1bf04d6a2 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -7,9 +7,26 @@ import ( "github.com/filecoin-project/go-lotus/lib/jsonrpc" ) -// NewRPC creates a new http jsonrpc client. -func NewRPC(addr string, requestHeader http.Header) (api.API, error) { - var res api.Struct - _, err := jsonrpc.NewClient(addr, "Filecoin", &res.Internal, requestHeader) +// NewFullNodeRPC creates a new http jsonrpc client. +func NewFullNodeRPC(addr string, requestHeader http.Header) (api.FullNode, error) { + var res api.FullNodeStruct + _, err := jsonrpc.NewMergeClient(addr, "Filecoin", + []interface{}{ + &res.CommonStruct.Internal, + &res.Internal, + }, requestHeader) + + return &res, err +} + +// NewStorageMinerRPC creates a new http jsonrpc client for storage miner +func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMiner, error) { + var res api.StorageMinerStruct + _, err := jsonrpc.NewMergeClient(addr, "Filecoin", + []interface{}{ + &res.CommonStruct.Internal, + &res.Internal, + }, requestHeader) + return &res, err } diff --git a/api/permissioned.go b/api/permissioned.go index 78616518a..71c93f8e7 100644 --- a/api/permissioned.go +++ b/api/permissioned.go @@ -27,11 +27,23 @@ func WithPerm(ctx context.Context, perms []string) context.Context { return context.WithValue(ctx, permCtxKey, perms) } -func Permissioned(a API) API { - var out Struct +func PermissionedStorMinerAPI(a StorageMiner) StorageMiner { + var out StorageMinerStruct + permissionedAny(a, &out.Internal) + permissionedAny(a, &out.CommonStruct.Internal) + return &out +} - rint := reflect.ValueOf(&out.Internal).Elem() - ra := reflect.ValueOf(a) +func PermissionedFullAPI(a FullNode) FullNode { + var out FullNodeStruct + permissionedAny(a, &out.Internal) + permissionedAny(a, &out.CommonStruct.Internal) + return &out +} + +func permissionedAny(in interface{}, out interface{}) { + rint := reflect.ValueOf(out).Elem() + ra := reflect.ValueOf(in) for f := 0; f < rint.NumField(); f++ { field := rint.Type().Field(f) @@ -81,6 +93,4 @@ func Permissioned(a API) API { })) } - - return &out } diff --git a/api/struct.go b/api/struct.go index 3e238ab95..d9a629b35 100644 --- a/api/struct.go +++ b/api/struct.go @@ -14,15 +14,25 @@ import ( // All permissions are listed in permissioned.go var _ = AllPermissions -// Struct implements API passing calls to user-provided function values. -type Struct struct { +type CommonStruct struct { Internal struct { AuthVerify func(ctx context.Context, token string) ([]string, error) `perm:"read"` AuthNew func(ctx context.Context, perms []string) ([]byte, error) `perm:"admin"` + NetPeers func(context.Context) ([]peer.AddrInfo, error) `perm:"read"` + NetConnect func(context.Context, peer.AddrInfo) error `perm:"write"` + NetAddrsListen func(context.Context) (peer.AddrInfo, error) `perm:"read"` + ID func(context.Context) (peer.ID, error) `perm:"read"` Version func(context.Context) (Version, error) `perm:"read"` + } +} +// FullNodeStruct implements API passing calls to user-provided function values. +type FullNodeStruct struct { + CommonStruct + + Internal struct { ChainSubmitBlock func(ctx context.Context, blk *chain.BlockMsg) error `perm:"write"` ChainHead func(context.Context) (*chain.TipSet, error) `perm:"read"` ChainGetRandomness func(context.Context, *chain.TipSet) ([]byte, error) `perm:"read"` @@ -45,113 +55,118 @@ type Struct struct { ClientImport func(ctx context.Context, path string) (cid.Cid, error) `perm:"write"` ClientListImports func(ctx context.Context) ([]Import, error) `perm:"read"` - - NetPeers func(context.Context) ([]peer.AddrInfo, error) `perm:"read"` - NetConnect func(context.Context, peer.AddrInfo) error `perm:"write"` - NetAddrsListen func(context.Context) (peer.AddrInfo, error) `perm:"read"` } } -func (c *Struct) AuthVerify(ctx context.Context, token string) ([]string, error) { +type StorageMinerStruct struct { + CommonStruct + + Internal struct { + } +} + +func (c *CommonStruct) AuthVerify(ctx context.Context, token string) ([]string, error) { return c.Internal.AuthVerify(ctx, token) } -func (c *Struct) AuthNew(ctx context.Context, perms []string) ([]byte, error) { +func (c *CommonStruct) AuthNew(ctx context.Context, perms []string) ([]byte, error) { return c.Internal.AuthNew(ctx, perms) } -func (c *Struct) ClientListImports(ctx context.Context) ([]Import, error) { - return c.Internal.ClientListImports(ctx) -} - -func (c *Struct) ClientImport(ctx context.Context, path string) (cid.Cid, error) { - return c.Internal.ClientImport(ctx, path) -} - -func (c *Struct) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.SignedMessage, error) { - return c.Internal.MpoolPending(ctx, ts) -} - -func (c *Struct) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error { - return c.Internal.MpoolPush(ctx, smsg) -} - -func (c *Struct) MinerStart(ctx context.Context, addr address.Address) error { - return c.Internal.MinerStart(ctx, addr) -} - -func (c *Struct) MinerCreateBlock(ctx context.Context, addr address.Address, base *chain.TipSet, tickets []chain.Ticket, eproof chain.ElectionProof, msgs []*chain.SignedMessage) (*chain.BlockMsg, error) { - return c.Internal.MinerCreateBlock(ctx, addr, base, tickets, eproof, msgs) -} - -func (c *Struct) NetPeers(ctx context.Context) ([]peer.AddrInfo, error) { +func (c *CommonStruct) NetPeers(ctx context.Context) ([]peer.AddrInfo, error) { return c.Internal.NetPeers(ctx) } -func (c *Struct) NetConnect(ctx context.Context, p peer.AddrInfo) error { +func (c *CommonStruct) NetConnect(ctx context.Context, p peer.AddrInfo) error { return c.Internal.NetConnect(ctx, p) } -func (c *Struct) NetAddrsListen(ctx context.Context) (peer.AddrInfo, error) { +func (c *CommonStruct) NetAddrsListen(ctx context.Context) (peer.AddrInfo, error) { return c.Internal.NetAddrsListen(ctx) } -func (c *Struct) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { - return c.Internal.ChainSubmitBlock(ctx, blk) -} - -func (c *Struct) ChainHead(ctx context.Context) (*chain.TipSet, error) { - return c.Internal.ChainHead(ctx) -} - -func (c *Struct) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]byte, error) { - return c.Internal.ChainGetRandomness(ctx, pts) -} - -func (c *Struct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) { - return c.Internal.ChainWaitMsg(ctx, msgc) -} - // ID implements API.ID -func (c *Struct) ID(ctx context.Context) (peer.ID, error) { +func (c *CommonStruct) ID(ctx context.Context) (peer.ID, error) { return c.Internal.ID(ctx) } // Version implements API.Version -func (c *Struct) Version(ctx context.Context) (Version, error) { +func (c *CommonStruct) Version(ctx context.Context) (Version, error) { return c.Internal.Version(ctx) } -func (c *Struct) WalletNew(ctx context.Context, typ string) (address.Address, error) { +func (c *FullNodeStruct) ClientListImports(ctx context.Context) ([]Import, error) { + return c.Internal.ClientListImports(ctx) +} + +func (c *FullNodeStruct) ClientImport(ctx context.Context, path string) (cid.Cid, error) { + return c.Internal.ClientImport(ctx, path) +} + +func (c *FullNodeStruct) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.SignedMessage, error) { + return c.Internal.MpoolPending(ctx, ts) +} + +func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error { + return c.Internal.MpoolPush(ctx, smsg) +} + +func (c *FullNodeStruct) MinerStart(ctx context.Context, addr address.Address) error { + return c.Internal.MinerStart(ctx, addr) +} + +func (c *FullNodeStruct) MinerCreateBlock(ctx context.Context, addr address.Address, base *chain.TipSet, tickets []chain.Ticket, eproof chain.ElectionProof, msgs []*chain.SignedMessage) (*chain.BlockMsg, error) { + return c.Internal.MinerCreateBlock(ctx, addr, base, tickets, eproof, msgs) +} + +func (c *FullNodeStruct) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { + return c.Internal.ChainSubmitBlock(ctx, blk) +} + +func (c *FullNodeStruct) ChainHead(ctx context.Context) (*chain.TipSet, error) { + return c.Internal.ChainHead(ctx) +} + +func (c *FullNodeStruct) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]byte, error) { + return c.Internal.ChainGetRandomness(ctx, pts) +} + +func (c *FullNodeStruct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) { + return c.Internal.ChainWaitMsg(ctx, msgc) +} + +func (c *FullNodeStruct) WalletNew(ctx context.Context, typ string) (address.Address, error) { return c.Internal.WalletNew(ctx, typ) } -func (c *Struct) WalletList(ctx context.Context) ([]address.Address, error) { +func (c *FullNodeStruct) WalletList(ctx context.Context) ([]address.Address, error) { return c.Internal.WalletList(ctx) } -func (c *Struct) WalletBalance(ctx context.Context, a address.Address) (types.BigInt, error) { +func (c *FullNodeStruct) WalletBalance(ctx context.Context, a address.Address) (types.BigInt, error) { return c.Internal.WalletBalance(ctx, a) } -func (c *Struct) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) { +func (c *FullNodeStruct) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) { return c.Internal.WalletSign(ctx, k, msg) } -func (c *Struct) WalletDefaultAddress(ctx context.Context) (address.Address, error) { +func (c *FullNodeStruct) WalletDefaultAddress(ctx context.Context) (address.Address, error) { return c.Internal.WalletDefaultAddress(ctx) } -func (c *Struct) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { +func (c *FullNodeStruct) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { return c.Internal.MpoolGetNonce(ctx, addr) } -func (c *Struct) ChainGetBlock(ctx context.Context, b cid.Cid) (*chain.BlockHeader, error) { +func (c *FullNodeStruct) ChainGetBlock(ctx context.Context, b cid.Cid) (*chain.BlockHeader, error) { return c.Internal.ChainGetBlock(ctx, b) } -func (c *Struct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) ([]*chain.SignedMessage, error) { +func (c *FullNodeStruct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) ([]*chain.SignedMessage, error) { return c.Internal.ChainGetBlockMessages(ctx, b) } -var _ API = &Struct{} +var _ Common = &CommonStruct{} +var _ FullNode = &FullNodeStruct{} +var _ StorageMiner = &StorageMinerStruct{} diff --git a/api/test/test.go b/api/test/test.go index 53a92f3c4..714063bf7 100644 --- a/api/test/test.go +++ b/api/test/test.go @@ -11,7 +11,7 @@ import ( // APIBuilder is a function which is invoked in test suite to provide // test nodes and networks -type APIBuilder func(t *testing.T, n int) []api.API +type APIBuilder func(t *testing.T, n int) []api.FullNode type testSuite struct { makeNodes APIBuilder } diff --git a/build/version.go b/build/version.go index 39e2ca1e2..339a6bf8a 100644 --- a/build/version.go +++ b/build/version.go @@ -2,3 +2,20 @@ package build // Version is the local build version, set by build system const Version = "0.0.0" + +// APIVersion is a hex semver version of the rpc api exposed +// +// M M P +// A I A +// J N T +// O O C +// R R H +// |\vv/| +// vv vv +const APIVersion = 0x000001 + +const ( + MajorMask = 0xff0000 + MinorMask = 0xffff00 + PatchMask = 0xffffff +) diff --git a/cli/chain.go b/cli/chain.go index c310cdd4f..31999c55c 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -23,11 +23,11 @@ var chainHeadCmd = &cli.Command{ Name: "head", Usage: "Print chain head", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) head, err := api.ChainHead(ctx) if err != nil { @@ -51,11 +51,11 @@ var chainGetBlock = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) if !cctx.Args().Present() { return fmt.Errorf("must pass cid of block to print") diff --git a/cli/client.go b/cli/client.go index fd60990f7..b1b49e011 100644 --- a/cli/client.go +++ b/cli/client.go @@ -19,11 +19,11 @@ var clientImportCmd = &cli.Command{ Name: "import", Usage: "Import data", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) c, err := api.ClientImport(ctx, cctx.Args().First()) if err != nil { @@ -38,11 +38,11 @@ var clientLocalCmd = &cli.Command{ Name: "local", Usage: "List locally imported data", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) list, err := api.ClientListImports(ctx) if err != nil { diff --git a/cli/cmd.go b/cli/cmd.go index b66da51be..96857efac 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -23,9 +23,9 @@ const ( ) // ApiConnector returns API instance -type ApiConnector func() api.API +type ApiConnector func() api.FullNode -func getAPI(ctx *cli.Context) (api.API, error) { +func GetAPI(ctx *cli.Context) (api.FullNode, error) { r, err := repo.NewFS(ctx.String("repo")) if err != nil { return nil, err @@ -48,13 +48,13 @@ func getAPI(ctx *cli.Context) (api.API, error) { headers.Add("Authorization", "Bearer "+string(token)) } - return client.NewRPC("ws://"+addr+"/rpc/v0", headers) + return client.NewFullNodeRPC("ws://"+addr+"/rpc/v0", headers) } -// reqContext returns context for cli execution. Calling it for the first time +// ReqContext returns context for cli execution. Calling it for the first time // installs SIGTERM handler that will close returned context. // Not safe for concurrent execution. -func reqContext(cctx *cli.Context) context.Context { +func ReqContext(cctx *cli.Context) context.Context { if uctx, ok := cctx.App.Metadata[metadataContext]; ok { // unchecked cast as if something else is in there // it is crash worthy either way diff --git a/cli/createminer.go b/cli/createminer.go index f22931627..d0e29e862 100644 --- a/cli/createminer.go +++ b/cli/createminer.go @@ -23,7 +23,7 @@ var createMinerCmd = &cli.Command{ return fmt.Errorf("must pass four arguments: worker address, owner address, sector size, peer ID") } - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } @@ -57,7 +57,7 @@ var createMinerCmd = &cli.Command{ PeerID: pid, } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) addr, err := api.WalletDefaultAddress(ctx) if err != nil { return xerrors.Errorf("failed to get default address: %w", err) diff --git a/cli/miner.go b/cli/miner.go index 84ea68ce1..607f26512 100644 --- a/cli/miner.go +++ b/cli/miner.go @@ -21,12 +21,12 @@ var minerStart = &cli.Command{ Name: "start", Usage: "start mining", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) // TODO: this address needs to be the address of an actual miner maddr, err := address.NewIDAddress(523423423) diff --git a/cli/mpool.go b/cli/mpool.go index faffc21ae..4ff97daa3 100644 --- a/cli/mpool.go +++ b/cli/mpool.go @@ -18,12 +18,12 @@ var mpoolPending = &cli.Command{ Name: "pending", Usage: "Get pending messages", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) msgs, err := api.MpoolPending(ctx, nil) if err != nil { diff --git a/cli/net.go b/cli/net.go index a5660d404..ce254c20a 100644 --- a/cli/net.go +++ b/cli/net.go @@ -27,11 +27,11 @@ var netPeers = &cli.Command{ Name: "peers", Usage: "Print peers", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) peers, err := api.NetPeers(ctx) if err != nil { return err @@ -49,11 +49,11 @@ var netListen = &cli.Command{ Name: "listen", Usage: "List listen addresses", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) addrs, err := api.NetAddrsListen(ctx) if err != nil { @@ -71,11 +71,11 @@ var netConnect = &cli.Command{ Name: "connect", Usage: "Connect to a peer", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) pis, err := parseAddresses(ctx, cctx.Args().Slice()) if err != nil { @@ -100,12 +100,12 @@ var netId = &cli.Command{ Name: "id", Usage: "Get node identity", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) pid, err := api.ID(ctx) if err != nil { diff --git a/cli/version.go b/cli/version.go index cb58c8ba9..2ccdb4360 100644 --- a/cli/version.go +++ b/cli/version.go @@ -10,12 +10,12 @@ var versionCmd = &cli.Command{ Name: "version", Usage: "Print version", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) // TODO: print more useful things fmt.Println(api.Version(ctx)) diff --git a/cli/wallet.go b/cli/wallet.go index ba8ce007a..b8372a749 100644 --- a/cli/wallet.go +++ b/cli/wallet.go @@ -21,11 +21,11 @@ var walletNew = &cli.Command{ Name: "new", Usage: "Generate a new key of the given type (bls or secp256k1)", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) t := cctx.Args().First() if t == "" { @@ -47,11 +47,11 @@ var walletList = &cli.Command{ Name: "list", Usage: "List wallet address", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) addrs, err := api.WalletList(ctx) if err != nil { @@ -69,11 +69,11 @@ var walletBalance = &cli.Command{ Name: "balance", Usage: "get account balance", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) addr, err := address.NewFromString(cctx.Args().First()) if err != nil { diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go new file mode 100644 index 000000000..c997e3062 --- /dev/null +++ b/cmd/lotus-storage-miner/init.go @@ -0,0 +1,63 @@ +package main + +import ( + "golang.org/x/xerrors" + "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/go-lotus/build" + lcli "github.com/filecoin-project/go-lotus/cli" + "github.com/filecoin-project/go-lotus/node/repo" +) + +var initCmd = &cli.Command{ + Name: "init", + Usage: "Initialize a lotus storage miner repo", + Action: func(cctx *cli.Context) error { + log.Info("Initializing lotus storage miner") + log.Info("Checking if repo exists") + + r, err := repo.NewFS(cctx.String(FlagStorageRepo)) + if err != nil { + return err + } + + ok, err := r.Exists() + if err != nil { + return err + } + if ok { + return xerrors.Errorf("repo at '%s' is already initialized", cctx.String(FlagStorageRepo)) + } + + log.Info("Trying to connect to full node RPC") + + api, err := lcli.GetAPI(cctx) // TODO: consider storing full node address in config + if err != nil { + return err + } + ctx := lcli.ReqContext(cctx) + + log.Info("Checking full node version") + + v, err := api.Version(ctx) + if err != nil { + return err + } + + if v.APIVersion&build.MinorMask != build.APIVersion&build.MinorMask { + return xerrors.Errorf("Remote API version didn't match (local %x, remote %x)", build.APIVersion, v.APIVersion) + } + + log.Info("Initializing repo") + + if err := r.Init(); err != nil { + return err + } + + // create actors and stuff + + log.Info("Storage miner successfully created, you can now start it with 'lotus-storage-miner run'") + + return nil + }, +} diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go new file mode 100644 index 000000000..3954c4c3d --- /dev/null +++ b/cmd/lotus-storage-miner/main.go @@ -0,0 +1,49 @@ +package main + +import ( + "os" + + logging "github.com/ipfs/go-log" + "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/go-lotus/build" + lcli "github.com/filecoin-project/go-lotus/cli" +) + +var log = logging.Logger("main") + +const FlagStorageRepo = "storagerepo" + +func main() { + logging.SetLogLevel("*", "INFO") + local := []*cli.Command{ + runCmd, + initCmd, + } + + app := &cli.App{ + Name: "lotus-storage-miner", + Usage: "Filecoin decentralized storage network storage miner", + Version: build.Version, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "repo", + EnvVars: []string{"LOTUS_PATH"}, + Hidden: true, + Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME + }, + &cli.StringFlag{ + Name: FlagStorageRepo, + EnvVars: []string{"LOTUS_STORAGE_PATH"}, + Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME + }, + }, + + Commands: append(local, lcli.Commands...), + } + + if err := app.Run(os.Args); err != nil { + log.Error(err) + return + } +} diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go new file mode 100644 index 000000000..1cc209532 --- /dev/null +++ b/cmd/lotus-storage-miner/run.go @@ -0,0 +1,85 @@ +package main + +import ( + "net/http" + + "github.com/multiformats/go-multiaddr" + "golang.org/x/xerrors" + "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/go-lotus/api" + lcli "github.com/filecoin-project/go-lotus/cli" + "github.com/filecoin-project/go-lotus/lib/auth" + "github.com/filecoin-project/go-lotus/lib/jsonrpc" + "github.com/filecoin-project/go-lotus/node" + "github.com/filecoin-project/go-lotus/node/repo" +) + +var runCmd = &cli.Command{ + Name: "run", + Usage: "Start a lotus storage miner process", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "api", + Value: "2345", + }, + }, + Action: func(cctx *cli.Context) error { + nodeApi, err := lcli.GetAPI(cctx) + if err != nil { + return err + } + ctx := lcli.ReqContext(cctx) + + v, err := nodeApi.Version(ctx) + if err != nil { + return err + } + + r, err := repo.NewFS(cctx.String(FlagStorageRepo)) + if err != nil { + return err + } + + ok, err := r.Exists() + if err != nil { + return err + } + if !ok { + return xerrors.Errorf("repo at '%s' is not initialized, run 'lotus-storage-miner init' to set it up", cctx.String(FlagStorageRepo)) + } + + var minerapi api.StorageMiner + err = node.New(ctx, + node.StorageMiner(&minerapi), + node.Online(), + node.Repo(r), + + node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { + apima, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/" + cctx.String("api")) + if err != nil { + return err + } + return lr.SetAPIEndpoint(apima) + }), + ) + if err != nil { + return err + } + + // TODO: libp2p node + + log.Infof("Remote version %s", v) + + rpcServer := jsonrpc.NewServer() + rpcServer.Register("Filecoin", api.PermissionedStorMinerAPI(minerapi)) + + ah := &auth.Handler{ + Verify: minerapi.AuthVerify, + Next: rpcServer.ServeHTTP, + } + + http.Handle("/rpc/v0", ah) + return http.ListenAndServe("127.0.0.1:"+cctx.String("api"), http.DefaultServeMux) + }, +} diff --git a/daemon/cmd.go b/cmd/lotus/daemon.go similarity index 76% rename from daemon/cmd.go rename to cmd/lotus/daemon.go index 814325aa0..b8a88bc33 100644 --- a/daemon/cmd.go +++ b/cmd/lotus/daemon.go @@ -1,10 +1,11 @@ // +build !nodaemon -package daemon +package main import ( "context" + "github.com/filecoin-project/go-lotus/api" "github.com/multiformats/go-multiaddr" "gopkg.in/urfave/cli.v2" @@ -12,8 +13,8 @@ import ( "github.com/filecoin-project/go-lotus/node/repo" ) -// Cmd is the `go-lotus daemon` command -var Cmd = &cli.Command{ +// DaemonCmd is the `go-lotus daemon` command +var DaemonCmd = &cli.Command{ Name: "daemon", Usage: "Start a lotus daemon process", Flags: []cli.Flag{ @@ -33,7 +34,10 @@ var Cmd = &cli.Command{ return err } - api, err := node.New(ctx, + var api api.FullNode + err = node.New(ctx, + node.FullAPI(&api), + node.Online(), node.Repo(r), @@ -49,12 +53,7 @@ var Cmd = &cli.Command{ return err } - // Write cli token to the repo if not there yet - if _, err := api.AuthNew(ctx, nil); err != nil { - return err - } - // TODO: properly parse api endpoint (or make it a URL) - return serveRPC(api, "127.0.0.1:"+cctx.String("api"), api.AuthVerify) + return serveRPC(api, "127.0.0.1:"+cctx.String("api")) }, } diff --git a/daemon/cmd_nodaemon.go b/cmd/lotus/daemon_nodaemon.go similarity index 78% rename from daemon/cmd_nodaemon.go rename to cmd/lotus/daemon_nodaemon.go index 3a4544e9c..05a8f97da 100644 --- a/daemon/cmd_nodaemon.go +++ b/cmd/lotus/daemon_nodaemon.go @@ -1,6 +1,6 @@ // +build nodaemon -package daemon +package main import ( "errors" @@ -8,8 +8,8 @@ import ( "gopkg.in/urfave/cli.v2" ) -// Cmd is the `go-lotus daemon` command -var Cmd = &cli.Command{ +// DaemonCmd is the `go-lotus daemon` command +var DaemonCmd = &cli.Command{ Name: "daemon", Usage: "Start a lotus daemon process", Flags: []cli.Flag{ diff --git a/cmd/lotus/main.go b/cmd/lotus/main.go index abda9b021..adb21b2e3 100644 --- a/cmd/lotus/main.go +++ b/cmd/lotus/main.go @@ -9,13 +9,12 @@ import ( "github.com/filecoin-project/go-lotus/build" lcli "github.com/filecoin-project/go-lotus/cli" - "github.com/filecoin-project/go-lotus/daemon" ) func main() { logging.SetLogLevel("*", "INFO") local := []*cli.Command{ - daemon.Cmd, + DaemonCmd, } app := &cli.App{ diff --git a/cmd/lotus/rpc.go b/cmd/lotus/rpc.go new file mode 100644 index 000000000..a0be6ecbe --- /dev/null +++ b/cmd/lotus/rpc.go @@ -0,0 +1,22 @@ +package main + +import ( + "net/http" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/lib/auth" + "github.com/filecoin-project/go-lotus/lib/jsonrpc" +) + +func serveRPC(a api.FullNode, addr string) error { + rpcServer := jsonrpc.NewServer() + rpcServer.Register("Filecoin", api.PermissionedFullAPI(a)) + + ah := &auth.Handler{ + Verify: a.AuthVerify, + Next: rpcServer.ServeHTTP, + } + + http.Handle("/rpc/v0", ah) + return http.ListenAndServe(addr, http.DefaultServeMux) +} diff --git a/daemon/rpc.go b/daemon/rpc.go deleted file mode 100644 index 21afb1f8b..000000000 --- a/daemon/rpc.go +++ /dev/null @@ -1,23 +0,0 @@ -package daemon - -import ( - "context" - "github.com/filecoin-project/go-lotus/lib/auth" - "net/http" - - "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/lib/jsonrpc" -) - -func serveRPC(a api.API, addr string, verify func(ctx context.Context, token string) ([]string, error)) error { - rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", api.Permissioned(a)) - - authHandler := &auth.Handler{ - Verify: verify, - Next: rpcServer.ServeHTTP, - } - - http.Handle("/rpc/v0", authHandler) - return http.ListenAndServe(addr, http.DefaultServeMux) -} diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index aff8db320..448418c67 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -3,7 +3,6 @@ package jsonrpc import ( "context" "encoding/json" - "errors" "fmt" "net/http" "reflect" @@ -42,12 +41,14 @@ type clientResponse struct { Error *respError `json:"error,omitempty"` } +type makeChanSink func() (context.Context, func([]byte, bool)) + type clientRequest struct { req request ready chan clientResponse // retCh provides a context and sink for handling incoming channel messages - retCh func() (context.Context, func([]byte, bool)) + retCh makeChanSink } // ClientCloser is used to close Client from further use @@ -59,191 +60,245 @@ type ClientCloser func() // Returned value closes the client connection // TODO: Example func NewClient(addr string, namespace string, handler interface{}, requestHeader http.Header) (ClientCloser, error) { - htyp := reflect.TypeOf(handler) - if htyp.Kind() != reflect.Ptr { - return nil, xerrors.New("expected handler to be a pointer") - } - typ := htyp.Elem() - if typ.Kind() != reflect.Struct { - return nil, xerrors.New("handler should be a struct") - } + return NewMergeClient(addr, namespace, []interface{}{handler}, requestHeader) +} - val := reflect.ValueOf(handler) +type client struct { + namespace string - var idCtr int64 + requests chan clientRequest + idCtr int64 +} +// NewMergeClient is like NewClient, but allows to specify multiple structs +// to be filled in the same namespace, using one connection +func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header) (ClientCloser, error) { conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader) if err != nil { return nil, err } + c := client{ + namespace: namespace, + } + stop := make(chan struct{}) - requests := make(chan clientRequest) + c.requests = make(chan clientRequest) handlers := map[string]rpcHandler{} go (&wsConn{ conn: conn, handler: handlers, - requests: requests, + requests: c.requests, stop: stop, }).handleWsConn(context.TODO()) - for i := 0; i < typ.NumField(); i++ { - f := typ.Field(i) - ftyp := f.Type - if ftyp.Kind() != reflect.Func { - return nil, xerrors.New("handler field not a func") + for _, handler := range outs { + htyp := reflect.TypeOf(handler) + if htyp.Kind() != reflect.Ptr { + return nil, xerrors.New("expected handler to be a pointer") + } + typ := htyp.Elem() + if typ.Kind() != reflect.Struct { + return nil, xerrors.New("handler should be a struct") } - valOut, errOut, nout := processFuncOut(ftyp) + val := reflect.ValueOf(handler) - processResponse := func(resp clientResponse, rval reflect.Value) []reflect.Value { - out := make([]reflect.Value, nout) - - if valOut != -1 { - out[valOut] = rval - } - if errOut != -1 { - out[errOut] = reflect.New(errorType).Elem() - if resp.Error != nil { - out[errOut].Set(reflect.ValueOf(resp.Error)) - } + for i := 0; i < typ.NumField(); i++ { + fn, err := c.makeRpcFunc(typ.Field(i)) + if err != nil { + return nil, err } - return out + val.Elem().Field(i).Set(fn) } - - processError := func(err error) []reflect.Value { - out := make([]reflect.Value, nout) - - if valOut != -1 { - out[valOut] = reflect.New(ftyp.Out(valOut)).Elem() - } - if errOut != -1 { - out[errOut] = reflect.New(errorType).Elem() - out[errOut].Set(reflect.ValueOf(&ErrClient{err})) - } - - return out - } - - hasCtx := 0 - if ftyp.NumIn() > 0 && ftyp.In(0) == contextType { - hasCtx = 1 - } - retCh := valOut != -1 && ftyp.Out(valOut).Kind() == reflect.Chan - - fn := reflect.MakeFunc(ftyp, func(args []reflect.Value) (results []reflect.Value) { - id := atomic.AddInt64(&idCtr, 1) - params := make([]param, len(args)-hasCtx) - for i, arg := range args[hasCtx:] { - params[i] = param{ - v: arg, - } - } - - var ctx context.Context - if hasCtx == 1 { - ctx = args[0].Interface().(context.Context) - } - - var retVal reflect.Value - - // if the function returns a channel, we need to provide a sink for the - // messages - var chCtor func() (context.Context, func([]byte, bool)) - - if retCh { - retVal = reflect.Zero(ftyp.Out(valOut)) - - chCtor = func() (context.Context, func([]byte, bool)) { - // unpack chan type to make sure it's reflect.BothDir - ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) - ch := reflect.MakeChan(ctyp, 0) // todo: buffer? - retVal = ch.Convert(ftyp.Out(valOut)) - - return ctx, func(result []byte, ok bool) { - if !ok { - // remote channel closed, close ours too - ch.Close() - return - } - - val := reflect.New(ftyp.Out(valOut).Elem()) - if err := json.Unmarshal(result, val.Interface()); err != nil { - log.Errorf("error unmarshaling chan response: %s", err) - return - } - - ch.Send(val.Elem()) // todo: select on ctx is probably a good idea - } - } - } - - req := request{ - Jsonrpc: "2.0", - ID: &id, - Method: namespace + "." + f.Name, - Params: params, - } - - rchan := make(chan clientResponse, 1) - requests <- clientRequest{ - req: req, - ready: rchan, - - retCh: chCtor, - } - var ctxDone <-chan struct{} - var resp clientResponse - - if ctx != nil { - ctxDone = ctx.Done() - } - - // wait for response, handle context cancellation - loop: - for { - select { - case resp = <-rchan: - break loop - case <-ctxDone: // send cancel request - ctxDone = nil - - requests <- clientRequest{ - req: request{ - Jsonrpc: "2.0", - Method: wsCancel, - Params: []param{{v: reflect.ValueOf(id)}}, - }, - } - } - } - - if valOut != -1 && !retCh { - retVal = reflect.New(ftyp.Out(valOut)) - - if resp.Result != nil { - log.Debugw("rpc result", "type", ftyp.Out(valOut)) - if err := json.Unmarshal(resp.Result, retVal.Interface()); err != nil { - return processError(xerrors.Errorf("unmarshaling result: %w", err)) - } - } - - retVal = retVal.Elem() - } - - if resp.ID != *req.ID { - return processError(errors.New("request and response id didn't match")) - } - - return processResponse(resp, retVal) - }) - - val.Elem().Field(i).Set(fn) } return func() { close(stop) }, nil } + +func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) (func() reflect.Value, makeChanSink) { + retVal := reflect.Zero(ftyp.Out(valOut)) + + chCtor := func() (context.Context, func([]byte, bool)) { + // unpack chan type to make sure it's reflect.BothDir + ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) + ch := reflect.MakeChan(ctyp, 0) // todo: buffer? + retVal = ch.Convert(ftyp.Out(valOut)) + + return ctx, func(result []byte, ok bool) { + if !ok { + // remote channel closed, close ours too + ch.Close() + return + } + + val := reflect.New(ftyp.Out(valOut).Elem()) + if err := json.Unmarshal(result, val.Interface()); err != nil { + log.Errorf("error unmarshaling chan response: %s", err) + return + } + + ch.Send(val.Elem()) // todo: select on ctx is probably a good idea + } + } + + return func() reflect.Value { return retVal }, chCtor +} + +func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) clientResponse { + rchan := make(chan clientResponse, 1) + c.requests <- clientRequest{ + req: req, + ready: rchan, + + retCh: chCtor, + } + var ctxDone <-chan struct{} + var resp clientResponse + + if ctx != nil { + ctxDone = ctx.Done() + } + + // wait for response, handle context cancellation +loop: + for { + select { + case resp = <-rchan: + break loop + case <-ctxDone: // send cancel request + ctxDone = nil + + c.requests <- clientRequest{ + req: request{ + Jsonrpc: "2.0", + Method: wsCancel, + Params: []param{{v: reflect.ValueOf(*req.ID)}}, + }, + } + } + } + + return resp +} + +type rpcFunc struct { + client *client + + ftyp reflect.Type + name string + + nout int + valOut int + errOut int + + hasCtx int + retCh bool +} + +func (fn *rpcFunc) processResponse(resp clientResponse, rval reflect.Value) []reflect.Value { + out := make([]reflect.Value, fn.nout) + + if fn.valOut != -1 { + out[fn.valOut] = rval + } + if fn.errOut != -1 { + out[fn.errOut] = reflect.New(errorType).Elem() + if resp.Error != nil { + out[fn.errOut].Set(reflect.ValueOf(resp.Error)) + } + } + + return out +} + +func (fn *rpcFunc) processError(err error) []reflect.Value { + out := make([]reflect.Value, fn.nout) + + if fn.valOut != -1 { + out[fn.valOut] = reflect.New(fn.ftyp.Out(fn.valOut)).Elem() + } + if fn.errOut != -1 { + out[fn.errOut] = reflect.New(errorType).Elem() + out[fn.errOut].Set(reflect.ValueOf(&ErrClient{err})) + } + + return out +} + +func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value) { + id := atomic.AddInt64(&fn.client.idCtr, 1) + params := make([]param, len(args)-fn.hasCtx) + for i, arg := range args[fn.hasCtx:] { + params[i] = param{ + v: arg, + } + } + + var ctx context.Context + if fn.hasCtx == 1 { + ctx = args[0].Interface().(context.Context) + } + + retVal := func() reflect.Value { return reflect.Value{} } + + // if the function returns a channel, we need to provide a sink for the + // messages + var chCtor makeChanSink + if fn.retCh { + retVal, chCtor = fn.client.makeOutChan(ctx, fn.ftyp, fn.valOut) + } + + req := request{ + Jsonrpc: "2.0", + ID: &id, + Method: fn.client.namespace + "." + fn.name, + Params: params, + } + + resp := fn.client.sendRequest(ctx, req, chCtor) + + if resp.ID != *req.ID { + return fn.processError(xerrors.New("request and response id didn't match")) + } + + if fn.valOut != -1 && !fn.retCh { + val := reflect.New(fn.ftyp.Out(fn.valOut)) + + if resp.Result != nil { + log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut)) + if err := json.Unmarshal(resp.Result, val.Interface()); err != nil { + return fn.processError(xerrors.Errorf("unmarshaling result: %w", err)) + } + } + + retVal = func() reflect.Value { return val.Elem() } + } + + return fn.processResponse(resp, retVal()) +} + +func (c *client) makeRpcFunc(f reflect.StructField) (reflect.Value, error) { + ftyp := f.Type + if ftyp.Kind() != reflect.Func { + return reflect.Value{}, xerrors.New("handler field not a func") + } + + fun := &rpcFunc{ + client: c, + ftyp: ftyp, + name: f.Name, + } + fun.valOut, fun.errOut, fun.nout = processFuncOut(ftyp) + + if ftyp.NumIn() > 0 && ftyp.In(0) == contextType { + fun.hasCtx = 1 + } + fun.retCh = fun.valOut != -1 && ftyp.Out(fun.valOut).Kind() == reflect.Chan + + return reflect.MakeFunc(ftyp, fun.handleRpcCall), nil +} diff --git a/lib/jsonrpc/server.go b/lib/jsonrpc/server.go index 009710f85..18d2da48f 100644 --- a/lib/jsonrpc/server.go +++ b/lib/jsonrpc/server.go @@ -3,9 +3,11 @@ package jsonrpc import ( "context" "encoding/json" - "github.com/gorilla/websocket" "io" "net/http" + "strings" + + "github.com/gorilla/websocket" ) const ( @@ -26,9 +28,21 @@ func NewServer() *RPCServer { } } -var upgrader = websocket.Upgrader{} +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} func (s *RPCServer) handleWS(ctx context.Context, w http.ResponseWriter, r *http.Request) { + // TODO: allow setting + // (note that we still are mostly covered by jwt tokens) + w.Header().Set("Access-Control-Allow-Origin", "*") + if r.Header.Get("Sec-WebSocket-Protocol") != "" { + w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol")) + } + + c, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Error(err) @@ -51,7 +65,7 @@ func (s *RPCServer) handleWS(ctx context.Context, w http.ResponseWriter, r *http func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - if r.Header.Get("Connection") == "Upgrade" { + if strings.Contains(r.Header.Get("Connection"), "Upgrade") { s.handleWS(ctx, w, r) return } diff --git a/node/api.go b/node/api.go deleted file mode 100644 index 07e4a1911..000000000 --- a/node/api.go +++ /dev/null @@ -1,205 +0,0 @@ -package node - -import ( - "context" - - "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/build" - "github.com/filecoin-project/go-lotus/chain" - "github.com/filecoin-project/go-lotus/chain/address" - "github.com/filecoin-project/go-lotus/chain/types" - "github.com/filecoin-project/go-lotus/miner" - "github.com/filecoin-project/go-lotus/node/client" - "github.com/filecoin-project/go-lotus/node/modules" - - "github.com/gbrlsnchs/jwt/v3" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/peer" - pubsub "github.com/libp2p/go-libp2p-pubsub" - ma "github.com/multiformats/go-multiaddr" - "golang.org/x/xerrors" -) - -var log = logging.Logger("node") - -type API struct { - client.LocalStorage - - Host host.Host - Chain *chain.ChainStore - PubSub *pubsub.PubSub - Mpool *chain.MessagePool - Wallet *chain.Wallet - APISecret *modules.APIAlg -} - -type jwtPayload struct { - Allow []string -} - -func (a *API) AuthVerify(ctx context.Context, token string) ([]string, error) { - var payload jwtPayload - if _, err := jwt.Verify([]byte(token), (*jwt.HMACSHA)(a.APISecret), &payload); err != nil { - return nil, xerrors.Errorf("JWT Verification failed: %w", err) - } - - return payload.Allow, nil -} - -func (a *API) AuthNew(ctx context.Context, perms []string) ([]byte, error) { - p := jwtPayload{ - Allow: perms, // TODO: consider checking validity - } - - return jwt.Sign(&p, (*jwt.HMACSHA)(a.APISecret)) -} - -func (a *API) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { - if err := a.Chain.AddBlock(blk.Header); err != nil { - return err - } - - b, err := blk.Serialize() - if err != nil { - return err - } - - // TODO: anything else to do here? - return a.PubSub.Publish("/fil/blocks", b) -} - -func (a *API) ChainHead(context.Context) (*chain.TipSet, error) { - return a.Chain.GetHeaviestTipSet(), nil -} - -func (a *API) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]byte, error) { - // TODO: this needs to look back in the chain for the right random beacon value - return []byte("foo bar random"), nil -} - -func (a *API) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) { - panic("TODO") -} - -func (a *API) ChainGetBlock(ctx context.Context, msg cid.Cid) (*chain.BlockHeader, error) { - return a.Chain.GetBlock(msg) -} - -func (a *API) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) ([]*chain.SignedMessage, error) { - b, err := a.Chain.GetBlock(msg) - if err != nil { - return nil, err - } - - return a.Chain.MessagesForBlock(b) -} - -func (a *API) ID(context.Context) (peer.ID, error) { - return a.Host.ID(), nil -} - -func (a *API) Version(context.Context) (api.Version, error) { - return api.Version{ - Version: build.Version, - }, nil -} - -func (a *API) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.SignedMessage, error) { - // TODO: need to make sure we don't return messages that were already included in the referenced chain - // also need to accept ts == nil just fine, assume nil == chain.Head() - return a.Mpool.Pending(), nil -} - -func (a *API) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error { - msgb, err := smsg.Serialize() - if err != nil { - return err - } - - return a.PubSub.Publish("/fil/messages", msgb) -} - -func (a *API) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { - return a.Mpool.GetNonce(addr) -} - -func (a *API) MinerStart(ctx context.Context, addr address.Address) error { - // hrm... - m := miner.NewMiner(a, addr) - - go m.Mine(context.TODO()) - - return nil -} - -func (a *API) MinerCreateBlock(ctx context.Context, addr address.Address, parents *chain.TipSet, tickets []chain.Ticket, proof chain.ElectionProof, msgs []*chain.SignedMessage) (*chain.BlockMsg, error) { - fblk, err := chain.MinerCreateBlock(a.Chain, addr, parents, tickets, proof, msgs) - if err != nil { - return nil, err - } - - var out chain.BlockMsg - out.Header = fblk.Header - for _, msg := range fblk.Messages { - out.Messages = append(out.Messages, msg.Cid()) - } - - return &out, nil -} - -func (a *API) NetPeers(context.Context) ([]peer.AddrInfo, error) { - conns := a.Host.Network().Conns() - out := make([]peer.AddrInfo, len(conns)) - - for i, conn := range conns { - out[i] = peer.AddrInfo{ - ID: conn.RemotePeer(), - Addrs: []ma.Multiaddr{ - conn.RemoteMultiaddr(), - }, - } - } - - return out, nil -} - -func (a *API) WalletNew(ctx context.Context, typ string) (address.Address, error) { - return a.Wallet.GenerateKey(typ) -} - -func (a *API) WalletList(ctx context.Context) ([]address.Address, error) { - return a.Wallet.ListAddrs() -} - -func (a *API) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) { - return a.Chain.GetBalance(addr) -} - -func (a *API) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) { - return a.Wallet.Sign(k, msg) -} - -func (a *API) WalletDefaultAddress(ctx context.Context) (address.Address, error) { - addrs, err := a.Wallet.ListAddrs() - if err != nil { - return address.Undef, err - } - - // TODO: store a default address in the config or 'wallet' portion of the repo - return addrs[0], nil -} - -func (a *API) NetConnect(ctx context.Context, p peer.AddrInfo) error { - return a.Host.Connect(ctx, p) -} - -func (a *API) NetAddrsListen(context.Context) (peer.AddrInfo, error) { - return peer.AddrInfo{ - ID: a.Host.ID(), - Addrs: a.Host.Addrs(), - }, nil -} - -var _ api.API = &API{} diff --git a/node/builder.go b/node/builder.go index e8148df62..0c5326cad 100644 --- a/node/builder.go +++ b/node/builder.go @@ -7,10 +7,10 @@ import ( "time" "github.com/ipfs/go-filestore" + exchange "github.com/ipfs/go-ipfs-exchange-interface" "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" - exchange "github.com/ipfs/go-ipfs-exchange-interface" ipld "github.com/ipfs/go-ipld-format" ci "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/node/config" "github.com/filecoin-project/go-lotus/node/hello" + "github.com/filecoin-project/go-lotus/node/impl" "github.com/filecoin-project/go-lotus/node/modules" "github.com/filecoin-project/go-lotus/node/modules/helpers" "github.com/filecoin-project/go-lotus/node/modules/lp2p" @@ -71,11 +72,18 @@ const ( HandleIncomingMessagesKey // daemon + ExtractApiKey + SetApiEndpointKey _nInvokes // keep this last ) +const ( + nodeFull = iota + nodeStorageMiner +) + type Settings struct { // modules is a map of constructors for DI // @@ -88,6 +96,8 @@ type Settings struct { // type, and must be applied in correct order invokes []fx.Option + nodeType int + Online bool // Online option applied Config bool // Config option applied } @@ -118,23 +128,11 @@ func defaults() []Option { return []Option{ Override(new(helpers.MetricsCtx), context.Background), Override(new(record.Validator), modules.RecordValidator), - - // Filecoin modules - - Override(new(*chain.ChainStore), chain.NewChainStore), } } -// Online sets up basic libp2p node -func Online() Option { +func libp2p() Option { return Options( - // make sure that online is applied before Config. - // This is important because Config overrides some of Online units - func(s *Settings) error { s.Online = true; return nil }, - ApplyIf(func(s *Settings) bool { return s.Config }, - Error(errors.New("the Online option must be set before Config option")), - ), - Override(new(peerstore.Peerstore), pstoremem.NewPeerstore), Override(DefaultTransportsKey, lp2p.DefaultTransports), @@ -160,29 +158,76 @@ func Online() Option { Override(PstoreAddSelfKeysKey, lp2p.PstoreAddSelfKeys), Override(StartListeningKey, lp2p.StartListening(defConf.Libp2p.ListenAddresses)), + ) +} - // +// Online sets up basic libp2p node +func Online() Option { + return Options( + // make sure that online is applied before Config. + // This is important because Config overrides some of Online units + func(s *Settings) error { s.Online = true; return nil }, + ApplyIf(func(s *Settings) bool { return s.Config }, + Error(errors.New("the Online option must be set before Config option")), + ), - Override(new(blockstore.GCLocker), blockstore.NewGCLocker), - Override(new(blockstore.GCBlockstore), blockstore.NewGCBlockstore), - Override(new(exchange.Interface), modules.Bitswap), - Override(new(ipld.DAGService), testing.MemoryClientDag), + libp2p(), - // Filecoin services - Override(new(*chain.Syncer), chain.NewSyncer), - Override(new(*chain.BlockSync), chain.NewBlockSyncClient), - Override(new(*chain.Wallet), chain.NewWallet), - Override(new(*chain.MessagePool), chain.NewMessagePool), + // Full node - Override(new(modules.Genesis), testing.MakeGenesis), - Override(SetGenesisKey, modules.SetGenesis), + ApplyIf(func(s *Settings) bool { return s.nodeType == nodeFull }, + // TODO: Fix offline mode - Override(new(*hello.Service), hello.NewHelloService), - Override(new(*chain.BlockSyncService), chain.NewBlockSyncService), - Override(RunHelloKey, modules.RunHello), - Override(RunBlockSyncKey, modules.RunBlockSync), - Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), - Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages), + Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages), + + Override(new(*chain.ChainStore), chain.NewChainStore), + + Override(new(blockstore.GCLocker), blockstore.NewGCLocker), + Override(new(blockstore.GCBlockstore), blockstore.NewGCBlockstore), + Override(new(exchange.Interface), modules.Bitswap), + Override(new(ipld.DAGService), testing.MemoryClientDag), + + // Filecoin services + Override(new(*chain.Syncer), chain.NewSyncer), + Override(new(*chain.BlockSync), chain.NewBlockSyncClient), + Override(new(*chain.Wallet), chain.NewWallet), + Override(new(*chain.MessagePool), chain.NewMessagePool), + + Override(new(modules.Genesis), testing.MakeGenesis), + Override(SetGenesisKey, modules.SetGenesis), + + Override(new(*hello.Service), hello.NewHelloService), + Override(new(*chain.BlockSyncService), chain.NewBlockSyncService), + Override(RunHelloKey, modules.RunHello), + Override(RunBlockSyncKey, modules.RunBlockSync), + Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), + ), + + // Storage miner + + ) +} + +func StorageMiner(out *api.StorageMiner) Option { + return Options( + ApplyIf(func(s *Settings) bool { return s.Config }, + Error(errors.New("the StorageMiner option must be set before Config option")), + ), + ApplyIf(func(s *Settings) bool { return s.Online }, + Error(errors.New("the StorageMiner option must be set before Online option")), + ), + + func(s *Settings) error { + s.nodeType = nodeStorageMiner + return nil + }, + + func(s *Settings) error { + resAPI := &impl.StorageMinerAPI{} + s.invokes[ExtractApiKey] = fx.Extract(resAPI) + *out = resAPI + return nil + }, ) } @@ -231,9 +276,17 @@ func Repo(r repo.Repo) Option { ) } +func FullAPI(out *api.FullNode) Option { + return func(s *Settings) error { + resAPI := &impl.FullNodeAPI{} + s.invokes[ExtractApiKey] = fx.Extract(resAPI) + *out = resAPI + return nil + } +} + // New builds and starts new Filecoin node -func New(ctx context.Context, opts ...Option) (api.API, error) { - resAPI := &API{} +func New(ctx context.Context, opts ...Option) error { settings := Settings{ modules: map[interface{}]fx.Option{}, invokes: make([]fx.Option, _nInvokes), @@ -241,7 +294,7 @@ func New(ctx context.Context, opts ...Option) (api.API, error) { // apply module options in the right order if err := Options(Options(defaults()...), Options(opts...))(&settings); err != nil { - return nil, err + return err } // gather constructors for fx.Options @@ -261,8 +314,6 @@ func New(ctx context.Context, opts ...Option) (api.API, error) { fx.Options(ctors...), fx.Options(settings.invokes...), - fx.Extract(resAPI), - fx.NopLogger, ) @@ -270,10 +321,10 @@ func New(ctx context.Context, opts ...Option) (api.API, error) { // on this context, and implement closing logic through lifecycles // correctly if err := app.Start(ctx); err != nil { - return nil, err + return err } - return resAPI, nil + return nil } // In-memory / testing diff --git a/node/impl/common.go b/node/impl/common.go new file mode 100644 index 000000000..9be1915ae --- /dev/null +++ b/node/impl/common.go @@ -0,0 +1,83 @@ +package impl + +import ( + "context" + + "github.com/gbrlsnchs/jwt/v3" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" + "go.uber.org/fx" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/build" + "github.com/filecoin-project/go-lotus/node/modules" +) + +type CommonAPI struct { + fx.In + + APISecret *modules.APIAlg + Host host.Host +} + +type jwtPayload struct { + Allow []string +} + +func (a *CommonAPI) AuthVerify(ctx context.Context, token string) ([]string, error) { + var payload jwtPayload + if _, err := jwt.Verify([]byte(token), (*jwt.HMACSHA)(a.APISecret), &payload); err != nil { + return nil, xerrors.Errorf("JWT Verification failed: %w", err) + } + + return payload.Allow, nil +} + +func (a *CommonAPI) AuthNew(ctx context.Context, perms []string) ([]byte, error) { + p := jwtPayload{ + Allow: perms, // TODO: consider checking validity + } + + return jwt.Sign(&p, (*jwt.HMACSHA)(a.APISecret)) +} + +func (a *CommonAPI) NetPeers(context.Context) ([]peer.AddrInfo, error) { + conns := a.Host.Network().Conns() + out := make([]peer.AddrInfo, len(conns)) + + for i, conn := range conns { + out[i] = peer.AddrInfo{ + ID: conn.RemotePeer(), + Addrs: []ma.Multiaddr{ + conn.RemoteMultiaddr(), + }, + } + } + + return out, nil +} + +func (a *CommonAPI) NetConnect(ctx context.Context, p peer.AddrInfo) error { + return a.Host.Connect(ctx, p) +} + +func (a *CommonAPI) NetAddrsListen(context.Context) (peer.AddrInfo, error) { + return peer.AddrInfo{ + ID: a.Host.ID(), + Addrs: a.Host.Addrs(), + }, nil +} + +func (a *CommonAPI) ID(context.Context) (peer.ID, error) { + return a.Host.ID(), nil +} + +func (a *CommonAPI) Version(context.Context) (api.Version, error) { + return api.Version{ + Version: build.Version, + }, nil +} + +var _ api.Common = &CommonAPI{} diff --git a/node/impl/full.go b/node/impl/full.go new file mode 100644 index 000000000..94f6249e7 --- /dev/null +++ b/node/impl/full.go @@ -0,0 +1,140 @@ +package impl + +import ( + "context" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/chain" + "github.com/filecoin-project/go-lotus/chain/address" + "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/go-lotus/miner" + "github.com/filecoin-project/go-lotus/node/client" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + pubsub "github.com/libp2p/go-libp2p-pubsub" +) + +var log = logging.Logger("node") + +type FullNodeAPI struct { + client.LocalStorage + + CommonAPI + + Chain *chain.ChainStore + PubSub *pubsub.PubSub + Mpool *chain.MessagePool + Wallet *chain.Wallet +} + +func (a *FullNodeAPI) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { + if err := a.Chain.AddBlock(blk.Header); err != nil { + return err + } + + b, err := blk.Serialize() + if err != nil { + return err + } + + // TODO: anything else to do here? + return a.PubSub.Publish("/fil/blocks", b) +} + +func (a *FullNodeAPI) ChainHead(context.Context) (*chain.TipSet, error) { + return a.Chain.GetHeaviestTipSet(), nil +} + +func (a *FullNodeAPI) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]byte, error) { + // TODO: this needs to look back in the chain for the right random beacon value + return []byte("foo bar random"), nil +} + +func (a *FullNodeAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) { + panic("TODO") +} + +func (a *FullNodeAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*chain.BlockHeader, error) { + return a.Chain.GetBlock(msg) +} + +func (a *FullNodeAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) ([]*chain.SignedMessage, error) { + b, err := a.Chain.GetBlock(msg) + if err != nil { + return nil, err + } + + return a.Chain.MessagesForBlock(b) +} + +func (a *FullNodeAPI) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.SignedMessage, error) { + // TODO: need to make sure we don't return messages that were already included in the referenced chain + // also need to accept ts == nil just fine, assume nil == chain.Head() + return a.Mpool.Pending(), nil +} + +func (a *FullNodeAPI) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error { + msgb, err := smsg.Serialize() + if err != nil { + return err + } + + return a.PubSub.Publish("/fil/messages", msgb) +} + +func (a *FullNodeAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { + return a.Mpool.GetNonce(addr) +} + +func (a *FullNodeAPI) MinerStart(ctx context.Context, addr address.Address) error { + // hrm... + m := miner.NewMiner(a, addr) + + go m.Mine(context.TODO()) + + return nil +} + +func (a *FullNodeAPI) MinerCreateBlock(ctx context.Context, addr address.Address, parents *chain.TipSet, tickets []chain.Ticket, proof chain.ElectionProof, msgs []*chain.SignedMessage) (*chain.BlockMsg, error) { + fblk, err := chain.MinerCreateBlock(a.Chain, addr, parents, tickets, proof, msgs) + if err != nil { + return nil, err + } + + var out chain.BlockMsg + out.Header = fblk.Header + for _, msg := range fblk.Messages { + out.Messages = append(out.Messages, msg.Cid()) + } + + return &out, nil +} + +func (a *FullNodeAPI) WalletNew(ctx context.Context, typ string) (address.Address, error) { + return a.Wallet.GenerateKey(typ) +} + +func (a *FullNodeAPI) WalletList(ctx context.Context) ([]address.Address, error) { + return a.Wallet.ListAddrs() +} + +func (a *FullNodeAPI) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) { + return a.Chain.GetBalance(addr) +} + +func (a *FullNodeAPI) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) { + return a.Wallet.Sign(k, msg) +} + +func (a *FullNodeAPI) WalletDefaultAddress(ctx context.Context) (address.Address, error) { + addrs, err := a.Wallet.ListAddrs() + if err != nil { + return address.Undef, err + } + + // TODO: store a default address in the config or 'wallet' portion of the repo + return addrs[0], nil +} + +var _ api.FullNode = &FullNodeAPI{} diff --git a/node/impl/storminer.go b/node/impl/storminer.go new file mode 100644 index 000000000..197ca710a --- /dev/null +++ b/node/impl/storminer.go @@ -0,0 +1,11 @@ +package impl + +import ( + "github.com/filecoin-project/go-lotus/api" +) + +type StorageMinerAPI struct { + CommonAPI +} + +var _ api.StorageMiner = &StorageMinerAPI{} diff --git a/node/modules/core.go b/node/modules/core.go index 47106019a..18ccf453d 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -3,13 +3,11 @@ package modules import ( "context" "crypto/rand" - "github.com/filecoin-project/go-lotus/api" - "github.com/gbrlsnchs/jwt/v3" - "golang.org/x/xerrors" "io" "io/ioutil" "path/filepath" + "github.com/gbrlsnchs/jwt/v3" "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-blockservice" @@ -27,7 +25,9 @@ import ( "github.com/libp2p/go-libp2p-core/routing" record "github.com/libp2p/go-libp2p-record" "go.uber.org/fx" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/node/modules/helpers" diff --git a/node/node_test.go b/node/node_test.go index a6c6d4f1f..2a49f6152 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -16,15 +16,16 @@ import ( mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" ) -func builder(t *testing.T, n int) []api.API { +func builder(t *testing.T, n int) []api.FullNode { ctx := context.Background() mn := mocknet.New(ctx) - out := make([]api.API, n) + out := make([]api.FullNode, n) for i := 0; i < n; i++ { var err error - out[i], err = node.New(ctx, + err = node.New(ctx, + node.FullAPI(&out[i]), node.Online(), node.Repo(repo.NewMemory(nil)), MockHost(mn), @@ -47,9 +48,9 @@ func TestAPI(t *testing.T) { var nextApi int -func rpcBuilder(t *testing.T, n int) []api.API { +func rpcBuilder(t *testing.T, n int) []api.FullNode { nodeApis := builder(t, n) - out := make([]api.API, n) + out := make([]api.FullNode, n) for i, a := range nodeApis { rpcServer := jsonrpc.NewServer() @@ -57,7 +58,7 @@ func rpcBuilder(t *testing.T, n int) []api.API { testServ := httptest.NewServer(rpcServer) // todo: close var err error - out[i], err = client.NewRPC("ws://"+testServ.Listener.Addr().String(), nil) + out[i], err = client.NewFullNodeRPC("ws://"+testServ.Listener.Addr().String(), nil) if err != nil { t.Fatal(err) } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index e12a7fa3a..2ce0c0810 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -58,6 +58,15 @@ func NewFS(path string) (*FsRepo, error) { }, nil } +func (fsr *FsRepo) Exists() (bool, error) { + _, err := os.Stat(filepath.Join(fsr.path, fsConfig)) + notexist := os.IsNotExist(err) + if notexist { + err = nil + } + return !notexist, err +} + func (fsr *FsRepo) Init() error { if _, err := os.Stat(fsr.path); err == nil { return fsr.initKeystore() @@ -70,6 +79,14 @@ func (fsr *FsRepo) Init() error { if err != nil { return err } + c, err := os.Create(filepath.Join(fsr.path, fsConfig)) + if err != nil { + return err + } + if err := c.Close(); err != nil { + return err + } + return fsr.initKeystore() }