From b73f29286b8f735a275750664dde1942541da3e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 19 Jul 2019 01:16:23 +0200 Subject: [PATCH 01/21] Begin implementing storage miner --- .gitignore | 1 + Makefile | 1 + cli/chain.go | 8 ++-- cli/client.go | 8 ++-- cli/cmd.go | 6 +-- cli/createminer.go | 4 +- cli/miner.go | 4 +- cli/mpool.go | 4 +- cli/net.go | 16 +++---- cli/version.go | 4 +- cli/wallet.go | 12 ++--- cmd/lotus-storage-miner/main.go | 47 +++++++++++++++++++ cmd/lotus-storage-miner/storageminer.go | 26 +++++++++++ cmd/lotus/main.go | 3 +- daemon/cmd.go | 60 ------------------------- daemon/cmd_nodaemon.go | 24 ---------- daemon/rpc.go | 23 ---------- 17 files changed, 109 insertions(+), 142 deletions(-) create mode 100644 cmd/lotus-storage-miner/main.go create mode 100644 cmd/lotus-storage-miner/storageminer.go delete mode 100644 daemon/cmd.go delete mode 100644 daemon/cmd_nodaemon.go delete mode 100644 daemon/rpc.go diff --git a/.gitignore b/.gitignore index ee13fb270..e35423f3b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ lotus +lotus-storage-miner **/*.h **/*.a **/*.pc diff --git a/Makefile b/Makefile index 96f2b49dd..ce6b5512a 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,7 @@ deps: $(BUILD_DEPS) build: $(BUILD_DEPS) go build -o lotus ./cmd/lotus + go build -o lotus-storage-miner ./cmd/lotus-storage-miner .PHONY: build clean: diff --git a/cli/chain.go b/cli/chain.go index c310cdd4f..31999c55c 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -23,11 +23,11 @@ var chainHeadCmd = &cli.Command{ Name: "head", Usage: "Print chain head", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) head, err := api.ChainHead(ctx) if err != nil { @@ -51,11 +51,11 @@ var chainGetBlock = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) if !cctx.Args().Present() { return fmt.Errorf("must pass cid of block to print") diff --git a/cli/client.go b/cli/client.go index fd60990f7..b1b49e011 100644 --- a/cli/client.go +++ b/cli/client.go @@ -19,11 +19,11 @@ var clientImportCmd = &cli.Command{ Name: "import", Usage: "Import data", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) c, err := api.ClientImport(ctx, cctx.Args().First()) if err != nil { @@ -38,11 +38,11 @@ var clientLocalCmd = &cli.Command{ Name: "local", Usage: "List locally imported data", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) list, err := api.ClientListImports(ctx) if err != nil { diff --git a/cli/cmd.go b/cli/cmd.go index b66da51be..65de1a01f 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -25,7 +25,7 @@ const ( // ApiConnector returns API instance type ApiConnector func() api.API -func getAPI(ctx *cli.Context) (api.API, error) { +func GetAPI(ctx *cli.Context) (api.API, error) { r, err := repo.NewFS(ctx.String("repo")) if err != nil { return nil, err @@ -51,10 +51,10 @@ func getAPI(ctx *cli.Context) (api.API, error) { return client.NewRPC("ws://"+addr+"/rpc/v0", headers) } -// reqContext returns context for cli execution. Calling it for the first time +// ReqContext returns context for cli execution. Calling it for the first time // installs SIGTERM handler that will close returned context. // Not safe for concurrent execution. -func reqContext(cctx *cli.Context) context.Context { +func ReqContext(cctx *cli.Context) context.Context { if uctx, ok := cctx.App.Metadata[metadataContext]; ok { // unchecked cast as if something else is in there // it is crash worthy either way diff --git a/cli/createminer.go b/cli/createminer.go index f22931627..d0e29e862 100644 --- a/cli/createminer.go +++ b/cli/createminer.go @@ -23,7 +23,7 @@ var createMinerCmd = &cli.Command{ return fmt.Errorf("must pass four arguments: worker address, owner address, sector size, peer ID") } - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } @@ -57,7 +57,7 @@ var createMinerCmd = &cli.Command{ PeerID: pid, } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) addr, err := api.WalletDefaultAddress(ctx) if err != nil { return xerrors.Errorf("failed to get default address: %w", err) diff --git a/cli/miner.go b/cli/miner.go index 84ea68ce1..607f26512 100644 --- a/cli/miner.go +++ b/cli/miner.go @@ -21,12 +21,12 @@ var minerStart = &cli.Command{ Name: "start", Usage: "start mining", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) // TODO: this address needs to be the address of an actual miner maddr, err := address.NewIDAddress(523423423) diff --git a/cli/mpool.go b/cli/mpool.go index faffc21ae..4ff97daa3 100644 --- a/cli/mpool.go +++ b/cli/mpool.go @@ -18,12 +18,12 @@ var mpoolPending = &cli.Command{ Name: "pending", Usage: "Get pending messages", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) msgs, err := api.MpoolPending(ctx, nil) if err != nil { diff --git a/cli/net.go b/cli/net.go index a5660d404..ce254c20a 100644 --- a/cli/net.go +++ b/cli/net.go @@ -27,11 +27,11 @@ var netPeers = &cli.Command{ Name: "peers", Usage: "Print peers", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) peers, err := api.NetPeers(ctx) if err != nil { return err @@ -49,11 +49,11 @@ var netListen = &cli.Command{ Name: "listen", Usage: "List listen addresses", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) addrs, err := api.NetAddrsListen(ctx) if err != nil { @@ -71,11 +71,11 @@ var netConnect = &cli.Command{ Name: "connect", Usage: "Connect to a peer", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) pis, err := parseAddresses(ctx, cctx.Args().Slice()) if err != nil { @@ -100,12 +100,12 @@ var netId = &cli.Command{ Name: "id", Usage: "Get node identity", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) pid, err := api.ID(ctx) if err != nil { diff --git a/cli/version.go b/cli/version.go index cb58c8ba9..2ccdb4360 100644 --- a/cli/version.go +++ b/cli/version.go @@ -10,12 +10,12 @@ var versionCmd = &cli.Command{ Name: "version", Usage: "Print version", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) // TODO: print more useful things fmt.Println(api.Version(ctx)) diff --git a/cli/wallet.go b/cli/wallet.go index ba8ce007a..b8372a749 100644 --- a/cli/wallet.go +++ b/cli/wallet.go @@ -21,11 +21,11 @@ var walletNew = &cli.Command{ Name: "new", Usage: "Generate a new key of the given type (bls or secp256k1)", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) t := cctx.Args().First() if t == "" { @@ -47,11 +47,11 @@ var walletList = &cli.Command{ Name: "list", Usage: "List wallet address", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) addrs, err := api.WalletList(ctx) if err != nil { @@ -69,11 +69,11 @@ var walletBalance = &cli.Command{ Name: "balance", Usage: "get account balance", Action: func(cctx *cli.Context) error { - api, err := getAPI(cctx) + api, err := GetAPI(cctx) if err != nil { return err } - ctx := reqContext(cctx) + ctx := ReqContext(cctx) addr, err := address.NewFromString(cctx.Args().First()) if err != nil { diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go new file mode 100644 index 000000000..14ba2d100 --- /dev/null +++ b/cmd/lotus-storage-miner/main.go @@ -0,0 +1,47 @@ +package main + +import ( + "os" + + logging "github.com/ipfs/go-log" + "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/go-lotus/build" + lcli "github.com/filecoin-project/go-lotus/cli" +) + +var log = logging.Logger("main") + +func main() { + logging.SetLogLevel("*", "INFO") + local := []*cli.Command{ + RunCmd, + } + + app := &cli.App{ + Name: "lotus-storage-miner", + Usage: "Filecoin decentralized storage network storage miner", + Version: build.Version, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "repo", + EnvVars: []string{"LOTUS_PATH"}, + Hidden: true, + Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME + }, + &cli.StringFlag{ + Name: "storagerepo", + EnvVars: []string{"LOTUS_PATH"}, + Hidden: true, + Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME + }, + }, + + Commands: append(local, lcli.Commands...), + } + + if err := app.Run(os.Args); err != nil { + log.Error(err) + return + } +} diff --git a/cmd/lotus-storage-miner/storageminer.go b/cmd/lotus-storage-miner/storageminer.go new file mode 100644 index 000000000..956c0ccca --- /dev/null +++ b/cmd/lotus-storage-miner/storageminer.go @@ -0,0 +1,26 @@ +package main + +import ( + "gopkg.in/urfave/cli.v2" + + lcli "github.com/filecoin-project/go-lotus/cli" +) + +var RunCmd = &cli.Command{ + Name: "run", + Usage: "Start a lotus storage miner process", + Action: func(cctx *cli.Context) error { + api, err := lcli.GetAPI(cctx) + if err != nil { + return err + } + ctx := lcli.ReqContext(cctx) + + v, err := api.Version(ctx) + + // TODO: libp2p node + + log.Infof("Remote version %s", v) + return nil + }, +} diff --git a/cmd/lotus/main.go b/cmd/lotus/main.go index abda9b021..adb21b2e3 100644 --- a/cmd/lotus/main.go +++ b/cmd/lotus/main.go @@ -9,13 +9,12 @@ import ( "github.com/filecoin-project/go-lotus/build" lcli "github.com/filecoin-project/go-lotus/cli" - "github.com/filecoin-project/go-lotus/daemon" ) func main() { logging.SetLogLevel("*", "INFO") local := []*cli.Command{ - daemon.Cmd, + DaemonCmd, } app := &cli.App{ diff --git a/daemon/cmd.go b/daemon/cmd.go deleted file mode 100644 index 814325aa0..000000000 --- a/daemon/cmd.go +++ /dev/null @@ -1,60 +0,0 @@ -// +build !nodaemon - -package daemon - -import ( - "context" - - "github.com/multiformats/go-multiaddr" - "gopkg.in/urfave/cli.v2" - - "github.com/filecoin-project/go-lotus/node" - "github.com/filecoin-project/go-lotus/node/repo" -) - -// Cmd is the `go-lotus daemon` command -var Cmd = &cli.Command{ - Name: "daemon", - Usage: "Start a lotus daemon process", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "api", - Value: "1234", - }, - }, - Action: func(cctx *cli.Context) error { - ctx := context.Background() - r, err := repo.NewFS(cctx.String("repo")) - if err != nil { - return err - } - - if err := r.Init(); err != nil && err != repo.ErrRepoExists { - return err - } - - api, err := node.New(ctx, - node.Online(), - node.Repo(r), - - node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { - apima, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/" + cctx.String("api")) - if err != nil { - return err - } - return lr.SetAPIEndpoint(apima) - }), - ) - if err != nil { - return err - } - - // Write cli token to the repo if not there yet - if _, err := api.AuthNew(ctx, nil); err != nil { - return err - } - - // TODO: properly parse api endpoint (or make it a URL) - return serveRPC(api, "127.0.0.1:"+cctx.String("api"), api.AuthVerify) - }, -} diff --git a/daemon/cmd_nodaemon.go b/daemon/cmd_nodaemon.go deleted file mode 100644 index 3a4544e9c..000000000 --- a/daemon/cmd_nodaemon.go +++ /dev/null @@ -1,24 +0,0 @@ -// +build nodaemon - -package daemon - -import ( - "errors" - - "gopkg.in/urfave/cli.v2" -) - -// Cmd is the `go-lotus daemon` command -var Cmd = &cli.Command{ - Name: "daemon", - Usage: "Start a lotus daemon process", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "api", - Value: ":1234", - }, - }, - Action: func(cctx *cli.Context) error { - return errors.New("daemon support not included in this binary") - }, -} diff --git a/daemon/rpc.go b/daemon/rpc.go deleted file mode 100644 index 21afb1f8b..000000000 --- a/daemon/rpc.go +++ /dev/null @@ -1,23 +0,0 @@ -package daemon - -import ( - "context" - "github.com/filecoin-project/go-lotus/lib/auth" - "net/http" - - "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/lib/jsonrpc" -) - -func serveRPC(a api.API, addr string, verify func(ctx context.Context, token string) ([]string, error)) error { - rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", api.Permissioned(a)) - - authHandler := &auth.Handler{ - Verify: verify, - Next: rpcServer.ServeHTTP, - } - - http.Handle("/rpc/v0", authHandler) - return http.ListenAndServe(addr, http.DefaultServeMux) -} From 145814d5c48e9a41979ec506cf95595fe9a8eaf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 19 Jul 2019 01:18:11 +0200 Subject: [PATCH 02/21] Fix gitignore --- .gitignore | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index e35423f3b..2bbf35513 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ -lotus -lotus-storage-miner +/lotus +/lotus-storage-miner **/*.h **/*.a **/*.pc From a5441f8d177b602e6b5ef47eda3a44518d462fb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 19 Jul 2019 01:18:26 +0200 Subject: [PATCH 03/21] Add missing daemon files --- cmd/lotus/daemon.go | 54 ++++++++++++++++++++++++++++++++++++ cmd/lotus/daemon_nodaemon.go | 24 ++++++++++++++++ cmd/lotus/rpc.go | 15 ++++++++++ 3 files changed, 93 insertions(+) create mode 100644 cmd/lotus/daemon.go create mode 100644 cmd/lotus/daemon_nodaemon.go create mode 100644 cmd/lotus/rpc.go diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go new file mode 100644 index 000000000..3ad9f4620 --- /dev/null +++ b/cmd/lotus/daemon.go @@ -0,0 +1,54 @@ +// +build !nodaemon + +package main + +import ( + "context" + "github.com/multiformats/go-multiaddr" + "gopkg.in/urfave/cli.v2" + + "github.com/filecoin-project/go-lotus/node" + "github.com/filecoin-project/go-lotus/node/repo" +) + +// DaemonCmd is the `go-lotus daemon` command +var DaemonCmd = &cli.Command{ + Name: "daemon", + Usage: "Start a lotus daemon process", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "api", + Value: "1234", + }, + }, + Action: func(cctx *cli.Context) error { + ctx := context.Background() + r, err := repo.NewFS(cctx.String("repo")) + if err != nil { + return err + } + + if err := r.Init(); err != nil && err != repo.ErrRepoExists { + return err + } + + api, err := node.New(ctx, + node.Online(), + node.Repo(r), + + node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { + apima, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/" + cctx.String("api")) + if err != nil { + return err + } + return lr.SetAPIEndpoint(apima) + }), + ) + if err != nil { + return err + } + + // TODO: properly parse api endpoint (or make it a URL) + return serveRPC(api, "127.0.0.1:"+cctx.String("api")) + }, +} diff --git a/cmd/lotus/daemon_nodaemon.go b/cmd/lotus/daemon_nodaemon.go new file mode 100644 index 000000000..bbaaa4963 --- /dev/null +++ b/cmd/lotus/daemon_nodaemon.go @@ -0,0 +1,24 @@ +// +build nodaemon + +package main + +import ( + "errors" + + "gopkg.in/urfave/cli.v2" +) + +// DaemonCmd is the `go-lotus daemon` command +var Cmd = &cli.Command{ + Name: "daemon", + Usage: "Start a lotus daemon process", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "api", + Value: ":1234", + }, + }, + Action: func(cctx *cli.Context) error { + return errors.New("daemon support not included in this binary") + }, +} diff --git a/cmd/lotus/rpc.go b/cmd/lotus/rpc.go new file mode 100644 index 000000000..8801bd0c4 --- /dev/null +++ b/cmd/lotus/rpc.go @@ -0,0 +1,15 @@ +package main + +import ( + "net/http" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/lib/jsonrpc" +) + +func serveRPC(api api.API, addr string) error { + rpcServer := jsonrpc.NewServer() + rpcServer.Register("Filecoin", api) + http.Handle("/rpc/v0", rpcServer) + return http.ListenAndServe(addr, http.DefaultServeMux) +} From 486598ef53e47433b524bf4480394d8bfa1d90f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 19 Jul 2019 01:44:59 +0200 Subject: [PATCH 04/21] storageminer: separate env var for storage repo --- cmd/lotus-storage-miner/main.go | 2 +- cmd/lotus-storage-miner/{storageminer.go => run.go} | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) rename cmd/lotus-storage-miner/{storageminer.go => run.go} (70%) diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go index 14ba2d100..a6122ff9c 100644 --- a/cmd/lotus-storage-miner/main.go +++ b/cmd/lotus-storage-miner/main.go @@ -31,7 +31,7 @@ func main() { }, &cli.StringFlag{ Name: "storagerepo", - EnvVars: []string{"LOTUS_PATH"}, + EnvVars: []string{"LOTUS_STORAGE_PATH"}, Hidden: true, Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME }, diff --git a/cmd/lotus-storage-miner/storageminer.go b/cmd/lotus-storage-miner/run.go similarity index 70% rename from cmd/lotus-storage-miner/storageminer.go rename to cmd/lotus-storage-miner/run.go index 956c0ccca..d4b1a2ba3 100644 --- a/cmd/lotus-storage-miner/storageminer.go +++ b/cmd/lotus-storage-miner/run.go @@ -9,6 +9,13 @@ import ( var RunCmd = &cli.Command{ Name: "run", Usage: "Start a lotus storage miner process", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "storagerepo", + EnvVars: []string{"LOTUS_STORAGE_PATH"}, + Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME + }, + }, Action: func(cctx *cli.Context) error { api, err := lcli.GetAPI(cctx) if err != nil { From fe147ce90d7f14cb9b1c5587f4f7a5cc5bde759e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 19 Jul 2019 11:23:24 +0200 Subject: [PATCH 05/21] StorageMiner node type --- node/builder.go | 97 ++++++++++++++++++++++++++++++--------------- node/repo/fsrepo.go | 5 +++ 2 files changed, 71 insertions(+), 31 deletions(-) diff --git a/node/builder.go b/node/builder.go index e8148df62..aec19c14f 100644 --- a/node/builder.go +++ b/node/builder.go @@ -7,10 +7,10 @@ import ( "time" "github.com/ipfs/go-filestore" + exchange "github.com/ipfs/go-ipfs-exchange-interface" "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" - exchange "github.com/ipfs/go-ipfs-exchange-interface" ipld "github.com/ipfs/go-ipld-format" ci "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" @@ -76,6 +76,11 @@ const ( _nInvokes // keep this last ) +const ( + nodeFull = iota + nodeStorageMiner +) + type Settings struct { // modules is a map of constructors for DI // @@ -88,6 +93,8 @@ type Settings struct { // type, and must be applied in correct order invokes []fx.Option + nodeType int + Online bool // Online option applied Config bool // Config option applied } @@ -125,16 +132,8 @@ func defaults() []Option { } } -// Online sets up basic libp2p node -func Online() Option { +func libp2p() Option { return Options( - // make sure that online is applied before Config. - // This is important because Config overrides some of Online units - func(s *Settings) error { s.Online = true; return nil }, - ApplyIf(func(s *Settings) bool { return s.Config }, - Error(errors.New("the Online option must be set before Config option")), - ), - Override(new(peerstore.Peerstore), pstoremem.NewPeerstore), Override(DefaultTransportsKey, lp2p.DefaultTransports), @@ -160,29 +159,65 @@ func Online() Option { Override(PstoreAddSelfKeysKey, lp2p.PstoreAddSelfKeys), Override(StartListeningKey, lp2p.StartListening(defConf.Libp2p.ListenAddresses)), + ) +} - // - - Override(new(blockstore.GCLocker), blockstore.NewGCLocker), - Override(new(blockstore.GCBlockstore), blockstore.NewGCBlockstore), - Override(new(exchange.Interface), modules.Bitswap), - Override(new(ipld.DAGService), testing.MemoryClientDag), - - // Filecoin services - Override(new(*chain.Syncer), chain.NewSyncer), - Override(new(*chain.BlockSync), chain.NewBlockSyncClient), - Override(new(*chain.Wallet), chain.NewWallet), - Override(new(*chain.MessagePool), chain.NewMessagePool), - - Override(new(modules.Genesis), testing.MakeGenesis), - Override(SetGenesisKey, modules.SetGenesis), - - Override(new(*hello.Service), hello.NewHelloService), - Override(new(*chain.BlockSyncService), chain.NewBlockSyncService), - Override(RunHelloKey, modules.RunHello), - Override(RunBlockSyncKey, modules.RunBlockSync), - Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), +// Online sets up basic libp2p node +func Online() Option { + return Options( Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages), + // make sure that online is applied before Config. + // This is important because Config overrides some of Online units + func(s *Settings) error { s.Online = true; return nil }, + ApplyIf(func(s *Settings) bool { return s.Config }, + Error(errors.New("the Online option must be set before Config option")), + ), + + libp2p(), + + // Full node + + ApplyIf(func(s *Settings) bool { return s.nodeType == nodeFull }, + Override(new(blockstore.GCLocker), blockstore.NewGCLocker), + Override(new(blockstore.GCBlockstore), blockstore.NewGCBlockstore), + Override(new(exchange.Interface), modules.Bitswap), + Override(new(ipld.DAGService), testing.MemoryClientDag), + + // Filecoin services + Override(new(*chain.Syncer), chain.NewSyncer), + Override(new(*chain.BlockSync), chain.NewBlockSyncClient), + Override(new(*chain.Wallet), chain.NewWallet), + Override(new(*chain.MessagePool), chain.NewMessagePool), + + Override(new(modules.Genesis), testing.MakeGenesis), + Override(SetGenesisKey, modules.SetGenesis), + + Override(new(*hello.Service), hello.NewHelloService), + Override(new(*chain.BlockSyncService), chain.NewBlockSyncService), + Override(RunHelloKey, modules.RunHello), + Override(RunBlockSyncKey, modules.RunBlockSync), + Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), + ), + + // Storage miner + + + ) +} + +func StorageMiner() Option { + return Options( + ApplyIf(func(s *Settings) bool { return s.Config }, + Error(errors.New("the StorageMiner option must be set before Config option")), + ), + ApplyIf(func(s *Settings) bool { return s.Online }, + Error(errors.New("the StorageMiner option must be set before Online option")), + ), + + func(s *Settings) error { + s.nodeType = nodeStorageMiner + return nil + }, ) } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index e12a7fa3a..72e0e6173 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -58,6 +58,11 @@ func NewFS(path string) (*FsRepo, error) { }, nil } +func (fsr *FsRepo) Exists() bool { + _, err := os.Stat(fsr.path) + return err == nil +} + func (fsr *FsRepo) Init() error { if _, err := os.Stat(fsr.path); err == nil { return fsr.initKeystore() From 0e570c0b1943c5123746de49ef4b5187e97f45e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 19 Jul 2019 11:24:11 +0200 Subject: [PATCH 06/21] StorageMiner init code --- cmd/lotus-storage-miner/init.go | 43 ++++++++++++++++++++++++++++++++ cmd/lotus-storage-miner/run.go | 44 ++++++++++++++++++++++++++++++++- 2 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 cmd/lotus-storage-miner/init.go diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go new file mode 100644 index 000000000..618e5de1e --- /dev/null +++ b/cmd/lotus-storage-miner/init.go @@ -0,0 +1,43 @@ +package main + +import ( + "golang.org/x/xerrors" + "gopkg.in/urfave/cli.v2" + + lcli "github.com/filecoin-project/go-lotus/cli" + "github.com/filecoin-project/go-lotus/node/repo" +) + +var initCmd = &cli.Command{ + Name: "init", + Usage: "Initialize a lotus storage miner repo", + Action: func(cctx *cli.Context) error { + log.Info("Initializing lotus storage miner") + log.Info("Checking if repo exists") + + r, err := repo.NewFS(cctx.String("storagerepo")) + if err != nil { + return err + } + + if r.Exists() { + return xerrors.Errorf("repo at '%s' is already initialized", cctx.String("storagerepo")) + } + + log.Info("Trying to connect to full node RPC") + + api, err := lcli.GetAPI(cctx) + if err != nil { + return err + } + ctx := lcli.ReqContext(cctx) + + log.Info("Checking full node version") + + if err := r.Init(); err != nil { + return err + } + + return nil + }, +} diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index d4b1a2ba3..be870378f 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -1,9 +1,16 @@ package main import ( + "net/http" + + "github.com/multiformats/go-multiaddr" + "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" lcli "github.com/filecoin-project/go-lotus/cli" + "github.com/filecoin-project/go-lotus/lib/jsonrpc" + "github.com/filecoin-project/go-lotus/node" + "github.com/filecoin-project/go-lotus/node/repo" ) var RunCmd = &cli.Command{ @@ -15,6 +22,10 @@ var RunCmd = &cli.Command{ EnvVars: []string{"LOTUS_STORAGE_PATH"}, Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME }, + &cli.StringFlag{ + Name: "api", + Value: "2345", + }, }, Action: func(cctx *cli.Context) error { api, err := lcli.GetAPI(cctx) @@ -25,9 +36,40 @@ var RunCmd = &cli.Command{ v, err := api.Version(ctx) + r, err := repo.NewFS(cctx.String("storagerepo")) + if err != nil { + return err + } + + if !r.Exists() { + return xerrors.Errorf("repo at '%s' is not initialized, run 'lotus-storage-miner init' to set it up", cctx.String("storagerepo")) + } + + minerapi, err := node.New(ctx, + node.StorageMiner(), + node.Online(), + node.Repo(r), + + node.Override(node.SetApiEndpointKey, func(lr repo.LockedRepo) error { + apima, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/" + cctx.String("api")) + if err != nil { + return err + } + return lr.SetAPIEndpoint(apima) + }), + ) + if err != nil { + return err + } + // TODO: libp2p node log.Infof("Remote version %s", v) - return nil + + + rpcServer := jsonrpc.NewServer() + rpcServer.Register("Filecoin", minerapi) + http.Handle("/rpc/v0", rpcServer) + return http.ListenAndServe("127.0.0.1:"+cctx.String("api"), http.DefaultServeMux) }, } From 677f5c3e30eba44589212b6cc060a8e1d4d4c973 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 19 Jul 2019 12:15:22 +0200 Subject: [PATCH 07/21] Check full node version in storageminer init --- api/api.go | 6 ++++++ build/version.go | 17 +++++++++++++++++ cmd/lotus-storage-miner/init.go | 18 +++++++++++++++++- cmd/lotus-storage-miner/main.go | 3 ++- cmd/lotus-storage-miner/run.go | 2 +- 5 files changed, 43 insertions(+), 3 deletions(-) diff --git a/api/api.go b/api/api.go index ee5d3ea6b..25309b592 100644 --- a/api/api.go +++ b/api/api.go @@ -16,6 +16,12 @@ import ( type Version struct { Version string + // APIVersion is a binary encoded semver version of the remote implementing + // this api + // + // See APIVersion in build/version.go + APIVersion uint32 + // TODO: git commit / os / genesis cid? } diff --git a/build/version.go b/build/version.go index 39e2ca1e2..d9c45d991 100644 --- a/build/version.go +++ b/build/version.go @@ -2,3 +2,20 @@ package build // Version is the local build version, set by build system const Version = "0.0.0" + +// APIVersion is a hex semver version of the rpc api exposed +// +// M M P +// A I A +// J N T +// O O C +// R R H +// |\vv/| +// vv vv +const APIVersion = 0x000001 + +const ( + MajorMask = 0xff0000 + MinorMask = 0xffff00 + PatchMask = 0xffffff +) \ No newline at end of file diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 618e5de1e..4e8e6503b 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -4,6 +4,7 @@ import ( "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" + "github.com/filecoin-project/go-lotus/build" lcli "github.com/filecoin-project/go-lotus/cli" "github.com/filecoin-project/go-lotus/node/repo" ) @@ -26,7 +27,7 @@ var initCmd = &cli.Command{ log.Info("Trying to connect to full node RPC") - api, err := lcli.GetAPI(cctx) + api, err := lcli.GetAPI(cctx) // TODO: consider storing full node address in config if err != nil { return err } @@ -34,10 +35,25 @@ var initCmd = &cli.Command{ log.Info("Checking full node version") + v, err := api.Version(ctx) + if err != nil { + return err + } + + if v.APIVersion & build.MinorMask != build.APIVersion & build.MinorMask { + return xerrors.Errorf("Remote API version didn't match (local %x, remote %x)", build.APIVersion, v.APIVersion) + } + + log.Info("Initializing repo") + if err := r.Init(); err != nil { return err } + // create actors and stuff + + log.Info("Storage miner successfully created, you can now start it with 'lotus-storage-miner run'") + return nil }, } diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go index a6122ff9c..cf32da73d 100644 --- a/cmd/lotus-storage-miner/main.go +++ b/cmd/lotus-storage-miner/main.go @@ -15,7 +15,8 @@ var log = logging.Logger("main") func main() { logging.SetLogLevel("*", "INFO") local := []*cli.Command{ - RunCmd, + runCmd, + initCmd, } app := &cli.App{ diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index be870378f..89174a815 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -13,7 +13,7 @@ import ( "github.com/filecoin-project/go-lotus/node/repo" ) -var RunCmd = &cli.Command{ +var runCmd = &cli.Command{ Name: "run", Usage: "Start a lotus storage miner process", Flags: []cli.Flag{ From 4f1946d5a23b31c63ddc5d5c8bb8e332588bf984 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 23 Jul 2019 23:50:52 +0200 Subject: [PATCH 08/21] Address review --- cmd/lotus-storage-miner/init.go | 4 ++-- cmd/lotus-storage-miner/main.go | 5 +++-- cmd/lotus-storage-miner/run.go | 9 ++------- cmd/lotus/daemon_nodaemon.go | 2 +- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 4e8e6503b..e1cc6adb9 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -16,13 +16,13 @@ var initCmd = &cli.Command{ log.Info("Initializing lotus storage miner") log.Info("Checking if repo exists") - r, err := repo.NewFS(cctx.String("storagerepo")) + r, err := repo.NewFS(cctx.String(FlagStorageRepo)) if err != nil { return err } if r.Exists() { - return xerrors.Errorf("repo at '%s' is already initialized", cctx.String("storagerepo")) + return xerrors.Errorf("repo at '%s' is already initialized", cctx.String(FlagStorageRepo)) } log.Info("Trying to connect to full node RPC") diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go index cf32da73d..3954c4c3d 100644 --- a/cmd/lotus-storage-miner/main.go +++ b/cmd/lotus-storage-miner/main.go @@ -12,6 +12,8 @@ import ( var log = logging.Logger("main") +const FlagStorageRepo = "storagerepo" + func main() { logging.SetLogLevel("*", "INFO") local := []*cli.Command{ @@ -31,9 +33,8 @@ func main() { Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME }, &cli.StringFlag{ - Name: "storagerepo", + Name: FlagStorageRepo, EnvVars: []string{"LOTUS_STORAGE_PATH"}, - Hidden: true, Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME }, }, diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 89174a815..1c6aa0a6a 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -17,11 +17,6 @@ var runCmd = &cli.Command{ Name: "run", Usage: "Start a lotus storage miner process", Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "storagerepo", - EnvVars: []string{"LOTUS_STORAGE_PATH"}, - Value: "~/.lotusstorage", // TODO: Consider XDG_DATA_HOME - }, &cli.StringFlag{ Name: "api", Value: "2345", @@ -36,13 +31,13 @@ var runCmd = &cli.Command{ v, err := api.Version(ctx) - r, err := repo.NewFS(cctx.String("storagerepo")) + r, err := repo.NewFS(cctx.String(FlagStorageRepo)) if err != nil { return err } if !r.Exists() { - return xerrors.Errorf("repo at '%s' is not initialized, run 'lotus-storage-miner init' to set it up", cctx.String("storagerepo")) + return xerrors.Errorf("repo at '%s' is not initialized, run 'lotus-storage-miner init' to set it up", cctx.String(FlagStorageRepo)) } minerapi, err := node.New(ctx, diff --git a/cmd/lotus/daemon_nodaemon.go b/cmd/lotus/daemon_nodaemon.go index bbaaa4963..05a8f97da 100644 --- a/cmd/lotus/daemon_nodaemon.go +++ b/cmd/lotus/daemon_nodaemon.go @@ -9,7 +9,7 @@ import ( ) // DaemonCmd is the `go-lotus daemon` command -var Cmd = &cli.Command{ +var DaemonCmd = &cli.Command{ Name: "daemon", Usage: "Start a lotus daemon process", Flags: []cli.Flag{ From 4c8b0288876cfe1d02117fbe9db45aa2cf13a484 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 23 Jul 2019 23:54:54 +0200 Subject: [PATCH 09/21] Return error form Repo.Exists --- cmd/lotus-storage-miner/init.go | 6 +++++- cmd/lotus-storage-miner/run.go | 6 +++++- node/repo/fsrepo.go | 8 ++++++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index e1cc6adb9..b9a0618ab 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -21,7 +21,11 @@ var initCmd = &cli.Command{ return err } - if r.Exists() { + ok, err := r.Exists() + if err != nil { + return err + } + if ok { return xerrors.Errorf("repo at '%s' is already initialized", cctx.String(FlagStorageRepo)) } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 1c6aa0a6a..16cd34515 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -36,7 +36,11 @@ var runCmd = &cli.Command{ return err } - if !r.Exists() { + ok, err := r.Exists() + if err != nil { + return err + } + if !ok { return xerrors.Errorf("repo at '%s' is not initialized, run 'lotus-storage-miner init' to set it up", cctx.String(FlagStorageRepo)) } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 72e0e6173..a506e6e8e 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -58,9 +58,13 @@ func NewFS(path string) (*FsRepo, error) { }, nil } -func (fsr *FsRepo) Exists() bool { +func (fsr *FsRepo) Exists() (bool, error) { _, err := os.Stat(fsr.path) - return err == nil + notexist := os.IsNotExist(err) + if notexist { + err = nil + } + return !notexist, err } func (fsr *FsRepo) Init() error { From 8d529d1ae7cc78c8d52526029415e765a4757d5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 23 Jul 2019 23:55:19 +0200 Subject: [PATCH 10/21] gofmt --- build/version.go | 2 +- cmd/lotus-storage-miner/init.go | 2 +- cmd/lotus-storage-miner/run.go | 1 - node/builder.go | 1 - 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/build/version.go b/build/version.go index d9c45d991..339a6bf8a 100644 --- a/build/version.go +++ b/build/version.go @@ -18,4 +18,4 @@ const ( MajorMask = 0xff0000 MinorMask = 0xffff00 PatchMask = 0xffffff -) \ No newline at end of file +) diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index b9a0618ab..c997e3062 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -44,7 +44,7 @@ var initCmd = &cli.Command{ return err } - if v.APIVersion & build.MinorMask != build.APIVersion & build.MinorMask { + if v.APIVersion&build.MinorMask != build.APIVersion&build.MinorMask { return xerrors.Errorf("Remote API version didn't match (local %x, remote %x)", build.APIVersion, v.APIVersion) } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 16cd34515..fb8cabfa3 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -65,7 +65,6 @@ var runCmd = &cli.Command{ log.Infof("Remote version %s", v) - rpcServer := jsonrpc.NewServer() rpcServer.Register("Filecoin", minerapi) http.Handle("/rpc/v0", rpcServer) diff --git a/node/builder.go b/node/builder.go index aec19c14f..cbd15ca82 100644 --- a/node/builder.go +++ b/node/builder.go @@ -201,7 +201,6 @@ func Online() Option { // Storage miner - ) } From d0cbf02d36ed6a575fa965622c4e1ceef50f90c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Jul 2019 00:34:13 +0200 Subject: [PATCH 11/21] Don't build full API in storage miner --- cmd/lotus-storage-miner/run.go | 4 ++-- cmd/lotus/daemon.go | 7 ++++++- node/builder.go | 33 +++++++++++++++++++++------------ node/node_test.go | 3 ++- 4 files changed, 31 insertions(+), 16 deletions(-) diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index fb8cabfa3..76f1192d7 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -44,7 +44,7 @@ var runCmd = &cli.Command{ return xerrors.Errorf("repo at '%s' is not initialized, run 'lotus-storage-miner init' to set it up", cctx.String(FlagStorageRepo)) } - minerapi, err := node.New(ctx, + err = node.New(ctx, node.StorageMiner(), node.Online(), node.Repo(r), @@ -66,7 +66,7 @@ var runCmd = &cli.Command{ log.Infof("Remote version %s", v) rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", minerapi) + //rpcServer.Register("Filecoin", minerapi) http.Handle("/rpc/v0", rpcServer) return http.ListenAndServe("127.0.0.1:"+cctx.String("api"), http.DefaultServeMux) }, diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 3ad9f4620..5fa7b0764 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -4,6 +4,8 @@ package main import ( "context" + + "github.com/filecoin-project/go-lotus/api" "github.com/multiformats/go-multiaddr" "gopkg.in/urfave/cli.v2" @@ -32,7 +34,10 @@ var DaemonCmd = &cli.Command{ return err } - api, err := node.New(ctx, + var api api.API + err = node.New(ctx, + node.FullAPI(&api), + node.Online(), node.Repo(r), diff --git a/node/builder.go b/node/builder.go index cbd15ca82..95e45b35a 100644 --- a/node/builder.go +++ b/node/builder.go @@ -71,6 +71,8 @@ const ( HandleIncomingMessagesKey // daemon + ExtractApiKey + SetApiEndpointKey _nInvokes // keep this last @@ -125,10 +127,6 @@ func defaults() []Option { return []Option{ Override(new(helpers.MetricsCtx), context.Background), Override(new(record.Validator), modules.RecordValidator), - - // Filecoin modules - - Override(new(*chain.ChainStore), chain.NewChainStore), } } @@ -165,7 +163,6 @@ func libp2p() Option { // Online sets up basic libp2p node func Online() Option { return Options( - Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages), // make sure that online is applied before Config. // This is important because Config overrides some of Online units func(s *Settings) error { s.Online = true; return nil }, @@ -178,6 +175,12 @@ func Online() Option { // Full node ApplyIf(func(s *Settings) bool { return s.nodeType == nodeFull }, + // TODO: Fix offline mode + + Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages), + + Override(new(*chain.ChainStore), chain.NewChainStore), + Override(new(blockstore.GCLocker), blockstore.NewGCLocker), Override(new(blockstore.GCBlockstore), blockstore.NewGCBlockstore), Override(new(exchange.Interface), modules.Bitswap), @@ -265,9 +268,17 @@ func Repo(r repo.Repo) Option { ) } +func FullAPI(out *api.API) Option { + return func(s *Settings) error { + resAPI := &API{} + s.invokes[ExtractApiKey] = fx.Extract(resAPI) + *out = resAPI + return nil + } +} + // New builds and starts new Filecoin node -func New(ctx context.Context, opts ...Option) (api.API, error) { - resAPI := &API{} +func New(ctx context.Context, opts ...Option) error { settings := Settings{ modules: map[interface{}]fx.Option{}, invokes: make([]fx.Option, _nInvokes), @@ -275,7 +286,7 @@ func New(ctx context.Context, opts ...Option) (api.API, error) { // apply module options in the right order if err := Options(Options(defaults()...), Options(opts...))(&settings); err != nil { - return nil, err + return err } // gather constructors for fx.Options @@ -295,8 +306,6 @@ func New(ctx context.Context, opts ...Option) (api.API, error) { fx.Options(ctors...), fx.Options(settings.invokes...), - fx.Extract(resAPI), - fx.NopLogger, ) @@ -304,10 +313,10 @@ func New(ctx context.Context, opts ...Option) (api.API, error) { // on this context, and implement closing logic through lifecycles // correctly if err := app.Start(ctx); err != nil { - return nil, err + return err } - return resAPI, nil + return nil } // In-memory / testing diff --git a/node/node_test.go b/node/node_test.go index a6c6d4f1f..a02d588aa 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -24,7 +24,8 @@ func builder(t *testing.T, n int) []api.API { for i := 0; i < n; i++ { var err error - out[i], err = node.New(ctx, + err = node.New(ctx, + node.FullAPI(&out[i]), node.Online(), node.Repo(repo.NewMemory(nil)), MockHost(mn), From eda03095b05abb360d0005636da82e3d11660b8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Jul 2019 02:09:34 +0200 Subject: [PATCH 12/21] Separate API for storageminer --- api/api.go | 78 ++++++++++----------------- api/client/client.go | 4 +- api/permissioned.go | 4 +- api/struct.go | 124 ++++++++++++++++++++++++------------------- api/test/test.go | 2 +- cli/cmd.go | 4 +- cmd/lotus/daemon.go | 2 +- cmd/lotus/rpc.go | 2 +- node/api.go | 2 +- node/builder.go | 2 +- node/node_test.go | 8 +-- 11 files changed, 114 insertions(+), 118 deletions(-) diff --git a/api/api.go b/api/api.go index 25309b592..f959cb455 100644 --- a/api/api.go +++ b/api/api.go @@ -3,13 +3,14 @@ package api import ( "context" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/types" "github.com/ipfs/go-cid" "github.com/ipfs/go-filestore" - "github.com/libp2p/go-libp2p-core/peer" ) // Version provides various build-time information @@ -37,14 +38,30 @@ type MsgWait struct { Receipt types.MessageReceipt } -// API is a low-level interface to the Filecoin network -type API interface { +type Common interface { // Auth AuthVerify(ctx context.Context, token string) ([]string, error) AuthNew(ctx context.Context, perms []string) ([]byte, error) - // chain + // network + NetPeers(context.Context) ([]peer.AddrInfo, error) + NetConnect(context.Context, peer.AddrInfo) error + NetAddrsListen(context.Context) (peer.AddrInfo, error) + + // ID returns peerID of libp2p node backing this API + ID(context.Context) (peer.ID, error) + + // Version provides information about API provider + Version(context.Context) (Version, error) + +} + +// FullNode API is a low-level interface to the Filecoin network full node +type FullNode interface { + Common + + // chain ChainHead(context.Context) (*chain.TipSet, error) // TODO: check serialization ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error // TODO: check serialization ChainGetRandomness(context.Context, *chain.TipSet) ([]byte, error) @@ -54,35 +71,13 @@ type API interface { // messages - // // wait - // // send - // // status - // // mpool - // // // ls / show / rm MpoolPending(context.Context, *chain.TipSet) ([]*chain.SignedMessage, error) MpoolPush(context.Context, *chain.SignedMessage) error - // dag - - // // get block - // // (cli: show / info) - - // network - - NetPeers(context.Context) ([]peer.AddrInfo, error) - NetConnect(context.Context, peer.AddrInfo) error - NetAddrsListen(context.Context) (peer.AddrInfo, error) - // // ping - - // Struct + // FullNodeStruct // miner - // // create - // // owner - // // power - // // set-price - // // set-perrid MinerStart(context.Context, address.Address) error MinerCreateBlock(context.Context, address.Address, *chain.TipSet, []chain.Ticket, chain.ElectionProof, []*chain.SignedMessage) (*chain.BlockMsg, error) @@ -99,22 +94,6 @@ type API interface { // Really not sure where this belongs. It could go on the wallet, or the message pool, or the chain... MpoolGetNonce(context.Context, address.Address) (uint64, error) - // // import - // // export - // // (on cli - cmd to list associations) - - // dht - - // // need ? - - // paych - - // // todo - - // retrieval - - // // retrieve piece - // Other // ClientImport imports file under the specified path into filestore @@ -127,10 +106,11 @@ type API interface { ClientListImports(ctx context.Context) ([]Import, error) //ClientListAsks() []Ask - - // ID returns peerID of libp2p node backing this API - ID(context.Context) (peer.ID, error) - - // Version provides information about API provider - Version(context.Context) (Version, error) +} + +// Full API is a low-level interface to the Filecoin network storage miner node +type StorageMiner interface { + Common + + } diff --git a/api/client/client.go b/api/client/client.go index 6a412956d..2d577608f 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -8,8 +8,8 @@ import ( ) // NewRPC creates a new http jsonrpc client. -func NewRPC(addr string, requestHeader http.Header) (api.API, error) { - var res api.Struct +func NewRPC(addr string, requestHeader http.Header) (api.FullNode, error) { + var res api.FullNodeStruct _, err := jsonrpc.NewClient(addr, "Filecoin", &res.Internal, requestHeader) return &res, err } diff --git a/api/permissioned.go b/api/permissioned.go index 78616518a..62603ea79 100644 --- a/api/permissioned.go +++ b/api/permissioned.go @@ -27,8 +27,8 @@ func WithPerm(ctx context.Context, perms []string) context.Context { return context.WithValue(ctx, permCtxKey, perms) } -func Permissioned(a API) API { - var out Struct +func Permissioned(a FullNode) FullNode { + var out FullNodeStruct rint := reflect.ValueOf(&out.Internal).Elem() ra := reflect.ValueOf(a) diff --git a/api/struct.go b/api/struct.go index 3e238ab95..dc3153e30 100644 --- a/api/struct.go +++ b/api/struct.go @@ -14,15 +14,55 @@ import ( // All permissions are listed in permissioned.go var _ = AllPermissions -// Struct implements API passing calls to user-provided function values. -type Struct struct { - Internal struct { +type CommonStruct struct { + Internal struct{ AuthVerify func(ctx context.Context, token string) ([]string, error) `perm:"read"` AuthNew func(ctx context.Context, perms []string) ([]byte, error) `perm:"admin"` + NetPeers func(context.Context) ([]peer.AddrInfo, error) `perm:"read"` + NetConnect func(context.Context, peer.AddrInfo) error `perm:"write"` + NetAddrsListen func(context.Context) (peer.AddrInfo, error) `perm:"read"` + ID func(context.Context) (peer.ID, error) `perm:"read"` Version func(context.Context) (Version, error) `perm:"read"` + } +} +func (c *CommonStruct) AuthVerify(ctx context.Context, token string) ([]string, error) { + return c.Internal.AuthVerify(ctx, token) +} + +func (c *CommonStruct) AuthNew(ctx context.Context, perms []string) ([]byte, error) { + return c.Internal.AuthNew(ctx, perms) +} + +func (c *CommonStruct) NetPeers(ctx context.Context) ([]peer.AddrInfo, error) { + return c.Internal.NetPeers(ctx) +} + +func (c *CommonStruct) NetConnect(ctx context.Context, p peer.AddrInfo) error { + return c.Internal.NetConnect(ctx, p) +} + +func (c *CommonStruct) NetAddrsListen(ctx context.Context) (peer.AddrInfo, error) { + return c.Internal.NetAddrsListen(ctx) +} + +// ID implements API.ID +func (c *CommonStruct) ID(ctx context.Context) (peer.ID, error) { + return c.Internal.ID(ctx) +} + +// Version implements API.Version +func (c *CommonStruct) Version(ctx context.Context) (Version, error) { + return c.Internal.Version(ctx) +} + +// FullNodeStruct implements API passing calls to user-provided function values. +type FullNodeStruct struct { + CommonStruct + + Internal struct { ChainSubmitBlock func(ctx context.Context, blk *chain.BlockMsg) error `perm:"write"` ChainHead func(context.Context) (*chain.TipSet, error) `perm:"read"` ChainGetRandomness func(context.Context, *chain.TipSet) ([]byte, error) `perm:"read"` @@ -45,113 +85,89 @@ type Struct struct { ClientImport func(ctx context.Context, path string) (cid.Cid, error) `perm:"write"` ClientListImports func(ctx context.Context) ([]Import, error) `perm:"read"` - - NetPeers func(context.Context) ([]peer.AddrInfo, error) `perm:"read"` - NetConnect func(context.Context, peer.AddrInfo) error `perm:"write"` - NetAddrsListen func(context.Context) (peer.AddrInfo, error) `perm:"read"` } } -func (c *Struct) AuthVerify(ctx context.Context, token string) ([]string, error) { - return c.Internal.AuthVerify(ctx, token) +type StorageMinerStruct struct { + CommonStruct + + Internal struct{ + + } } -func (c *Struct) AuthNew(ctx context.Context, perms []string) ([]byte, error) { - return c.Internal.AuthNew(ctx, perms) -} - -func (c *Struct) ClientListImports(ctx context.Context) ([]Import, error) { +func (c *FullNodeStruct) ClientListImports(ctx context.Context) ([]Import, error) { return c.Internal.ClientListImports(ctx) } -func (c *Struct) ClientImport(ctx context.Context, path string) (cid.Cid, error) { +func (c *FullNodeStruct) ClientImport(ctx context.Context, path string) (cid.Cid, error) { return c.Internal.ClientImport(ctx, path) } -func (c *Struct) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.SignedMessage, error) { +func (c *FullNodeStruct) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.SignedMessage, error) { return c.Internal.MpoolPending(ctx, ts) } -func (c *Struct) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error { +func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error { return c.Internal.MpoolPush(ctx, smsg) } -func (c *Struct) MinerStart(ctx context.Context, addr address.Address) error { +func (c *FullNodeStruct) MinerStart(ctx context.Context, addr address.Address) error { return c.Internal.MinerStart(ctx, addr) } -func (c *Struct) MinerCreateBlock(ctx context.Context, addr address.Address, base *chain.TipSet, tickets []chain.Ticket, eproof chain.ElectionProof, msgs []*chain.SignedMessage) (*chain.BlockMsg, error) { +func (c *FullNodeStruct) MinerCreateBlock(ctx context.Context, addr address.Address, base *chain.TipSet, tickets []chain.Ticket, eproof chain.ElectionProof, msgs []*chain.SignedMessage) (*chain.BlockMsg, error) { return c.Internal.MinerCreateBlock(ctx, addr, base, tickets, eproof, msgs) } -func (c *Struct) NetPeers(ctx context.Context) ([]peer.AddrInfo, error) { - return c.Internal.NetPeers(ctx) -} - -func (c *Struct) NetConnect(ctx context.Context, p peer.AddrInfo) error { - return c.Internal.NetConnect(ctx, p) -} - -func (c *Struct) NetAddrsListen(ctx context.Context) (peer.AddrInfo, error) { - return c.Internal.NetAddrsListen(ctx) -} - -func (c *Struct) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { +func (c *FullNodeStruct) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { return c.Internal.ChainSubmitBlock(ctx, blk) } -func (c *Struct) ChainHead(ctx context.Context) (*chain.TipSet, error) { +func (c *FullNodeStruct) ChainHead(ctx context.Context) (*chain.TipSet, error) { return c.Internal.ChainHead(ctx) } -func (c *Struct) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]byte, error) { +func (c *FullNodeStruct) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]byte, error) { return c.Internal.ChainGetRandomness(ctx, pts) } -func (c *Struct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) { +func (c *FullNodeStruct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) { return c.Internal.ChainWaitMsg(ctx, msgc) } -// ID implements API.ID -func (c *Struct) ID(ctx context.Context) (peer.ID, error) { - return c.Internal.ID(ctx) -} - -// Version implements API.Version -func (c *Struct) Version(ctx context.Context) (Version, error) { - return c.Internal.Version(ctx) -} - -func (c *Struct) WalletNew(ctx context.Context, typ string) (address.Address, error) { +func (c *FullNodeStruct) WalletNew(ctx context.Context, typ string) (address.Address, error) { return c.Internal.WalletNew(ctx, typ) } -func (c *Struct) WalletList(ctx context.Context) ([]address.Address, error) { +func (c *FullNodeStruct) WalletList(ctx context.Context) ([]address.Address, error) { return c.Internal.WalletList(ctx) } -func (c *Struct) WalletBalance(ctx context.Context, a address.Address) (types.BigInt, error) { +func (c *FullNodeStruct) WalletBalance(ctx context.Context, a address.Address) (types.BigInt, error) { return c.Internal.WalletBalance(ctx, a) } -func (c *Struct) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) { +func (c *FullNodeStruct) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) { return c.Internal.WalletSign(ctx, k, msg) } -func (c *Struct) WalletDefaultAddress(ctx context.Context) (address.Address, error) { +func (c *FullNodeStruct) WalletDefaultAddress(ctx context.Context) (address.Address, error) { return c.Internal.WalletDefaultAddress(ctx) } -func (c *Struct) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { +func (c *FullNodeStruct) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { return c.Internal.MpoolGetNonce(ctx, addr) } -func (c *Struct) ChainGetBlock(ctx context.Context, b cid.Cid) (*chain.BlockHeader, error) { +func (c *FullNodeStruct) ChainGetBlock(ctx context.Context, b cid.Cid) (*chain.BlockHeader, error) { return c.Internal.ChainGetBlock(ctx, b) } -func (c *Struct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) ([]*chain.SignedMessage, error) { +func (c *FullNodeStruct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) ([]*chain.SignedMessage, error) { return c.Internal.ChainGetBlockMessages(ctx, b) } -var _ API = &Struct{} +var _ Common = &CommonStruct{} +var _ FullNode = &FullNodeStruct{} +var _ StorageMiner = &StorageMinerStruct{} diff --git a/api/test/test.go b/api/test/test.go index 53a92f3c4..714063bf7 100644 --- a/api/test/test.go +++ b/api/test/test.go @@ -11,7 +11,7 @@ import ( // APIBuilder is a function which is invoked in test suite to provide // test nodes and networks -type APIBuilder func(t *testing.T, n int) []api.API +type APIBuilder func(t *testing.T, n int) []api.FullNode type testSuite struct { makeNodes APIBuilder } diff --git a/cli/cmd.go b/cli/cmd.go index 65de1a01f..587a35953 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -23,9 +23,9 @@ const ( ) // ApiConnector returns API instance -type ApiConnector func() api.API +type ApiConnector func() api.FullNode -func GetAPI(ctx *cli.Context) (api.API, error) { +func GetAPI(ctx *cli.Context) (api.FullNode, error) { r, err := repo.NewFS(ctx.String("repo")) if err != nil { return nil, err diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 5fa7b0764..b8a88bc33 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -34,7 +34,7 @@ var DaemonCmd = &cli.Command{ return err } - var api api.API + var api api.FullNode err = node.New(ctx, node.FullAPI(&api), diff --git a/cmd/lotus/rpc.go b/cmd/lotus/rpc.go index 8801bd0c4..6d22e1f85 100644 --- a/cmd/lotus/rpc.go +++ b/cmd/lotus/rpc.go @@ -7,7 +7,7 @@ import ( "github.com/filecoin-project/go-lotus/lib/jsonrpc" ) -func serveRPC(api api.API, addr string) error { +func serveRPC(api api.FullNode, addr string) error { rpcServer := jsonrpc.NewServer() rpcServer.Register("Filecoin", api) http.Handle("/rpc/v0", rpcServer) diff --git a/node/api.go b/node/api.go index 07e4a1911..b4aa9870f 100644 --- a/node/api.go +++ b/node/api.go @@ -202,4 +202,4 @@ func (a *API) NetAddrsListen(context.Context) (peer.AddrInfo, error) { }, nil } -var _ api.API = &API{} +var _ api.FullNode = &API{} diff --git a/node/builder.go b/node/builder.go index 95e45b35a..bc3446cf0 100644 --- a/node/builder.go +++ b/node/builder.go @@ -268,7 +268,7 @@ func Repo(r repo.Repo) Option { ) } -func FullAPI(out *api.API) Option { +func FullAPI(out *api.FullNode) Option { return func(s *Settings) error { resAPI := &API{} s.invokes[ExtractApiKey] = fx.Extract(resAPI) diff --git a/node/node_test.go b/node/node_test.go index a02d588aa..7480158cc 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -16,11 +16,11 @@ import ( mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" ) -func builder(t *testing.T, n int) []api.API { +func builder(t *testing.T, n int) []api.FullNode { ctx := context.Background() mn := mocknet.New(ctx) - out := make([]api.API, n) + out := make([]api.FullNode, n) for i := 0; i < n; i++ { var err error @@ -48,9 +48,9 @@ func TestAPI(t *testing.T) { var nextApi int -func rpcBuilder(t *testing.T, n int) []api.API { +func rpcBuilder(t *testing.T, n int) []api.FullNode { nodeApis := builder(t, n) - out := make([]api.API, n) + out := make([]api.FullNode, n) for i, a := range nodeApis { rpcServer := jsonrpc.NewServer() From f0e807dabb077bc62eafa43cc1aa9ae1082b1b23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Jul 2019 02:40:19 +0200 Subject: [PATCH 13/21] Fix jsonrpc client after splitting apis --- api/client/client.go | 23 ++- cli/cmd.go | 2 +- cmd/lotus-storage-miner/run.go | 2 +- lib/jsonrpc/client.go | 288 +++++++++++++++++---------------- node/api.go | 50 +++--- node/builder.go | 2 +- node/node_test.go | 2 +- 7 files changed, 197 insertions(+), 172 deletions(-) diff --git a/api/client/client.go b/api/client/client.go index 2d577608f..d7b862d94 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -7,9 +7,26 @@ import ( "github.com/filecoin-project/go-lotus/lib/jsonrpc" ) -// NewRPC creates a new http jsonrpc client. -func NewRPC(addr string, requestHeader http.Header) (api.FullNode, error) { +// NewFullNodeRPC creates a new http jsonrpc client. +func NewFullNodeRPC(addr string, requestHeader http.Header) (api.FullNode, error) { var res api.FullNodeStruct - _, err := jsonrpc.NewClient(addr, "Filecoin", &res.Internal, requestHeader) + _, err := jsonrpc.NewMergeClient(addr, "Filecoin", + []interface{}{ + &res.CommonStruct.Internal, + &res.Internal, + }, requestHeader) + return &res, err } + +// NewStorageMinerRPC creates a new http jsonrpc client for storage miner +func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMiner, error) { + var res api.StorageMinerStruct + _, err := jsonrpc.NewMergeClient(addr, "Filecoin", + []interface{}{ + &res.CommonStruct.Internal, + &res.Internal, + }, requestHeader) + + return &res, err +} \ No newline at end of file diff --git a/cli/cmd.go b/cli/cmd.go index 587a35953..96857efac 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -48,7 +48,7 @@ func GetAPI(ctx *cli.Context) (api.FullNode, error) { headers.Add("Authorization", "Bearer "+string(token)) } - return client.NewRPC("ws://"+addr+"/rpc/v0", headers) + return client.NewFullNodeRPC("ws://"+addr+"/rpc/v0", headers) } // ReqContext returns context for cli execution. Calling it for the first time diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 76f1192d7..5b25642c7 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -66,7 +66,7 @@ var runCmd = &cli.Command{ log.Infof("Remote version %s", v) rpcServer := jsonrpc.NewServer() - //rpcServer.Register("Filecoin", minerapi) + rpcServer.Register("Filecoin", minerapi) http.Handle("/rpc/v0", rpcServer) return http.ListenAndServe("127.0.0.1:"+cctx.String("api"), http.DefaultServeMux) }, diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index aff8db320..c0efa7337 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -59,17 +59,12 @@ type ClientCloser func() // Returned value closes the client connection // TODO: Example func NewClient(addr string, namespace string, handler interface{}, requestHeader http.Header) (ClientCloser, error) { - htyp := reflect.TypeOf(handler) - if htyp.Kind() != reflect.Ptr { - return nil, xerrors.New("expected handler to be a pointer") - } - typ := htyp.Elem() - if typ.Kind() != reflect.Struct { - return nil, xerrors.New("handler should be a struct") - } - - val := reflect.ValueOf(handler) + return NewMergeClient(addr, namespace, []interface{}{handler}, requestHeader) +} +// NewMergeClient is like NewClient, but allows to specify multiple structs +// to be filled in the same namespace, using one connection +func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header) (ClientCloser, error) { var idCtr int64 conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader) @@ -88,159 +83,172 @@ func NewClient(addr string, namespace string, handler interface{}, requestHeader stop: stop, }).handleWsConn(context.TODO()) - for i := 0; i < typ.NumField(); i++ { - f := typ.Field(i) - ftyp := f.Type - if ftyp.Kind() != reflect.Func { - return nil, xerrors.New("handler field not a func") + for _, handler := range outs { + htyp := reflect.TypeOf(handler) + if htyp.Kind() != reflect.Ptr { + return nil, xerrors.New("expected handler to be a pointer") + } + typ := htyp.Elem() + if typ.Kind() != reflect.Struct { + return nil, xerrors.New("handler should be a struct") } - valOut, errOut, nout := processFuncOut(ftyp) + val := reflect.ValueOf(handler) - processResponse := func(resp clientResponse, rval reflect.Value) []reflect.Value { - out := make([]reflect.Value, nout) - - if valOut != -1 { - out[valOut] = rval + for i := 0; i < typ.NumField(); i++ { + f := typ.Field(i) + ftyp := f.Type + if ftyp.Kind() != reflect.Func { + return nil, xerrors.New("handler field not a func") } - if errOut != -1 { - out[errOut] = reflect.New(errorType).Elem() - if resp.Error != nil { - out[errOut].Set(reflect.ValueOf(resp.Error)) + + valOut, errOut, nout := processFuncOut(ftyp) + + processResponse := func(resp clientResponse, rval reflect.Value) []reflect.Value { + out := make([]reflect.Value, nout) + + if valOut != -1 { + out[valOut] = rval } - } - - return out - } - - processError := func(err error) []reflect.Value { - out := make([]reflect.Value, nout) - - if valOut != -1 { - out[valOut] = reflect.New(ftyp.Out(valOut)).Elem() - } - if errOut != -1 { - out[errOut] = reflect.New(errorType).Elem() - out[errOut].Set(reflect.ValueOf(&ErrClient{err})) - } - - return out - } - - hasCtx := 0 - if ftyp.NumIn() > 0 && ftyp.In(0) == contextType { - hasCtx = 1 - } - retCh := valOut != -1 && ftyp.Out(valOut).Kind() == reflect.Chan - - fn := reflect.MakeFunc(ftyp, func(args []reflect.Value) (results []reflect.Value) { - id := atomic.AddInt64(&idCtr, 1) - params := make([]param, len(args)-hasCtx) - for i, arg := range args[hasCtx:] { - params[i] = param{ - v: arg, + if errOut != -1 { + out[errOut] = reflect.New(errorType).Elem() + if resp.Error != nil { + out[errOut].Set(reflect.ValueOf(resp.Error)) + } } + + return out } - var ctx context.Context - if hasCtx == 1 { - ctx = args[0].Interface().(context.Context) + processError := func(err error) []reflect.Value { + out := make([]reflect.Value, nout) + + if valOut != -1 { + out[valOut] = reflect.New(ftyp.Out(valOut)).Elem() + } + if errOut != -1 { + out[errOut] = reflect.New(errorType).Elem() + out[errOut].Set(reflect.ValueOf(&ErrClient{err})) + } + + return out } - var retVal reflect.Value + hasCtx := 0 + if ftyp.NumIn() > 0 && ftyp.In(0) == contextType { + hasCtx = 1 + } + retCh := valOut != -1 && ftyp.Out(valOut).Kind() == reflect.Chan - // if the function returns a channel, we need to provide a sink for the - // messages - var chCtor func() (context.Context, func([]byte, bool)) + fn := reflect.MakeFunc(ftyp, func(args []reflect.Value) (results []reflect.Value) { + id := atomic.AddInt64(&idCtr, 1) + params := make([]param, len(args)-hasCtx) + for i, arg := range args[hasCtx:] { + params[i] = param{ + v: arg, + } + } - if retCh { - retVal = reflect.Zero(ftyp.Out(valOut)) + var ctx context.Context + if hasCtx == 1 { + ctx = args[0].Interface().(context.Context) + } - chCtor = func() (context.Context, func([]byte, bool)) { - // unpack chan type to make sure it's reflect.BothDir - ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) - ch := reflect.MakeChan(ctyp, 0) // todo: buffer? - retVal = ch.Convert(ftyp.Out(valOut)) + var retVal reflect.Value - return ctx, func(result []byte, ok bool) { - if !ok { - // remote channel closed, close ours too - ch.Close() - return + // if the function returns a channel, we need to provide a sink for the + // messages + var chCtor func() (context.Context, func([]byte, bool)) + + if retCh { + retVal = reflect.Zero(ftyp.Out(valOut)) + + chCtor = func() (context.Context, func([]byte, bool)) { + // unpack chan type to make sure it's reflect.BothDir + ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) + ch := reflect.MakeChan(ctyp, 0) // todo: buffer? + retVal = ch.Convert(ftyp.Out(valOut)) + + return ctx, func(result []byte, ok bool) { + if !ok { + // remote channel closed, close ours too + ch.Close() + return + } + + val := reflect.New(ftyp.Out(valOut).Elem()) + if err := json.Unmarshal(result, val.Interface()); err != nil { + log.Errorf("error unmarshaling chan response: %s", err) + return + } + + ch.Send(val.Elem()) // todo: select on ctx is probably a good idea } + } + } - val := reflect.New(ftyp.Out(valOut).Elem()) - if err := json.Unmarshal(result, val.Interface()); err != nil { - log.Errorf("error unmarshaling chan response: %s", err) - return + req := request{ + Jsonrpc: "2.0", + ID: &id, + Method: namespace + "." + f.Name, + Params: params, + } + + rchan := make(chan clientResponse, 1) + requests <- clientRequest{ + req: req, + ready: rchan, + + retCh: chCtor, + } + var ctxDone <-chan struct{} + var resp clientResponse + + if ctx != nil { + ctxDone = ctx.Done() + } + + // wait for response, handle context cancellation + loop: + for { + select { + case resp = <-rchan: + break loop + case <-ctxDone: // send cancel request + ctxDone = nil + + requests <- clientRequest{ + req: request{ + Jsonrpc: "2.0", + Method: wsCancel, + Params: []param{{v: reflect.ValueOf(id)}}, + }, } - - ch.Send(val.Elem()) // todo: select on ctx is probably a good idea - } - } - } - - req := request{ - Jsonrpc: "2.0", - ID: &id, - Method: namespace + "." + f.Name, - Params: params, - } - - rchan := make(chan clientResponse, 1) - requests <- clientRequest{ - req: req, - ready: rchan, - - retCh: chCtor, - } - var ctxDone <-chan struct{} - var resp clientResponse - - if ctx != nil { - ctxDone = ctx.Done() - } - - // wait for response, handle context cancellation - loop: - for { - select { - case resp = <-rchan: - break loop - case <-ctxDone: // send cancel request - ctxDone = nil - - requests <- clientRequest{ - req: request{ - Jsonrpc: "2.0", - Method: wsCancel, - Params: []param{{v: reflect.ValueOf(id)}}, - }, - } - } - } - - if valOut != -1 && !retCh { - retVal = reflect.New(ftyp.Out(valOut)) - - if resp.Result != nil { - log.Debugw("rpc result", "type", ftyp.Out(valOut)) - if err := json.Unmarshal(resp.Result, retVal.Interface()); err != nil { - return processError(xerrors.Errorf("unmarshaling result: %w", err)) } } - retVal = retVal.Elem() - } + if valOut != -1 && !retCh { + retVal = reflect.New(ftyp.Out(valOut)) - if resp.ID != *req.ID { - return processError(errors.New("request and response id didn't match")) - } + if resp.Result != nil { + log.Debugw("rpc result", "type", ftyp.Out(valOut)) + if err := json.Unmarshal(resp.Result, retVal.Interface()); err != nil { + return processError(xerrors.Errorf("unmarshaling result: %w", err)) + } + } - return processResponse(resp, retVal) - }) + retVal = retVal.Elem() + } - val.Elem().Field(i).Set(fn) + if resp.ID != *req.ID { + return processError(errors.New("request and response id didn't match")) + } + + return processResponse(resp, retVal) + }) + + val.Elem().Field(i).Set(fn) + } } return func() { diff --git a/node/api.go b/node/api.go index b4aa9870f..3b5d85525 100644 --- a/node/api.go +++ b/node/api.go @@ -24,7 +24,7 @@ import ( var log = logging.Logger("node") -type API struct { +type FullNodeAPI struct { client.LocalStorage Host host.Host @@ -39,7 +39,7 @@ type jwtPayload struct { Allow []string } -func (a *API) AuthVerify(ctx context.Context, token string) ([]string, error) { +func (a *FullNodeAPI) AuthVerify(ctx context.Context, token string) ([]string, error) { var payload jwtPayload if _, err := jwt.Verify([]byte(token), (*jwt.HMACSHA)(a.APISecret), &payload); err != nil { return nil, xerrors.Errorf("JWT Verification failed: %w", err) @@ -48,7 +48,7 @@ func (a *API) AuthVerify(ctx context.Context, token string) ([]string, error) { return payload.Allow, nil } -func (a *API) AuthNew(ctx context.Context, perms []string) ([]byte, error) { +func (a *FullNodeAPI) AuthNew(ctx context.Context, perms []string) ([]byte, error) { p := jwtPayload{ Allow: perms, // TODO: consider checking validity } @@ -56,7 +56,7 @@ func (a *API) AuthNew(ctx context.Context, perms []string) ([]byte, error) { return jwt.Sign(&p, (*jwt.HMACSHA)(a.APISecret)) } -func (a *API) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { +func (a *FullNodeAPI) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { if err := a.Chain.AddBlock(blk.Header); err != nil { return err } @@ -70,24 +70,24 @@ func (a *API) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { return a.PubSub.Publish("/fil/blocks", b) } -func (a *API) ChainHead(context.Context) (*chain.TipSet, error) { +func (a *FullNodeAPI) ChainHead(context.Context) (*chain.TipSet, error) { return a.Chain.GetHeaviestTipSet(), nil } -func (a *API) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]byte, error) { +func (a *FullNodeAPI) ChainGetRandomness(ctx context.Context, pts *chain.TipSet) ([]byte, error) { // TODO: this needs to look back in the chain for the right random beacon value return []byte("foo bar random"), nil } -func (a *API) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) { +func (a *FullNodeAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) { panic("TODO") } -func (a *API) ChainGetBlock(ctx context.Context, msg cid.Cid) (*chain.BlockHeader, error) { +func (a *FullNodeAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*chain.BlockHeader, error) { return a.Chain.GetBlock(msg) } -func (a *API) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) ([]*chain.SignedMessage, error) { +func (a *FullNodeAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) ([]*chain.SignedMessage, error) { b, err := a.Chain.GetBlock(msg) if err != nil { return nil, err @@ -96,23 +96,23 @@ func (a *API) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) ([]*chain. return a.Chain.MessagesForBlock(b) } -func (a *API) ID(context.Context) (peer.ID, error) { +func (a *FullNodeAPI) ID(context.Context) (peer.ID, error) { return a.Host.ID(), nil } -func (a *API) Version(context.Context) (api.Version, error) { +func (a *FullNodeAPI) Version(context.Context) (api.Version, error) { return api.Version{ Version: build.Version, }, nil } -func (a *API) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.SignedMessage, error) { +func (a *FullNodeAPI) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.SignedMessage, error) { // TODO: need to make sure we don't return messages that were already included in the referenced chain // also need to accept ts == nil just fine, assume nil == chain.Head() return a.Mpool.Pending(), nil } -func (a *API) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error { +func (a *FullNodeAPI) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error { msgb, err := smsg.Serialize() if err != nil { return err @@ -121,11 +121,11 @@ func (a *API) MpoolPush(ctx context.Context, smsg *chain.SignedMessage) error { return a.PubSub.Publish("/fil/messages", msgb) } -func (a *API) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { +func (a *FullNodeAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { return a.Mpool.GetNonce(addr) } -func (a *API) MinerStart(ctx context.Context, addr address.Address) error { +func (a *FullNodeAPI) MinerStart(ctx context.Context, addr address.Address) error { // hrm... m := miner.NewMiner(a, addr) @@ -134,7 +134,7 @@ func (a *API) MinerStart(ctx context.Context, addr address.Address) error { return nil } -func (a *API) MinerCreateBlock(ctx context.Context, addr address.Address, parents *chain.TipSet, tickets []chain.Ticket, proof chain.ElectionProof, msgs []*chain.SignedMessage) (*chain.BlockMsg, error) { +func (a *FullNodeAPI) MinerCreateBlock(ctx context.Context, addr address.Address, parents *chain.TipSet, tickets []chain.Ticket, proof chain.ElectionProof, msgs []*chain.SignedMessage) (*chain.BlockMsg, error) { fblk, err := chain.MinerCreateBlock(a.Chain, addr, parents, tickets, proof, msgs) if err != nil { return nil, err @@ -149,7 +149,7 @@ func (a *API) MinerCreateBlock(ctx context.Context, addr address.Address, parent return &out, nil } -func (a *API) NetPeers(context.Context) ([]peer.AddrInfo, error) { +func (a *FullNodeAPI) NetPeers(context.Context) ([]peer.AddrInfo, error) { conns := a.Host.Network().Conns() out := make([]peer.AddrInfo, len(conns)) @@ -165,23 +165,23 @@ func (a *API) NetPeers(context.Context) ([]peer.AddrInfo, error) { return out, nil } -func (a *API) WalletNew(ctx context.Context, typ string) (address.Address, error) { +func (a *FullNodeAPI) WalletNew(ctx context.Context, typ string) (address.Address, error) { return a.Wallet.GenerateKey(typ) } -func (a *API) WalletList(ctx context.Context) ([]address.Address, error) { +func (a *FullNodeAPI) WalletList(ctx context.Context) ([]address.Address, error) { return a.Wallet.ListAddrs() } -func (a *API) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) { +func (a *FullNodeAPI) WalletBalance(ctx context.Context, addr address.Address) (types.BigInt, error) { return a.Chain.GetBalance(addr) } -func (a *API) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) { +func (a *FullNodeAPI) WalletSign(ctx context.Context, k address.Address, msg []byte) (*chain.Signature, error) { return a.Wallet.Sign(k, msg) } -func (a *API) WalletDefaultAddress(ctx context.Context) (address.Address, error) { +func (a *FullNodeAPI) WalletDefaultAddress(ctx context.Context) (address.Address, error) { addrs, err := a.Wallet.ListAddrs() if err != nil { return address.Undef, err @@ -191,15 +191,15 @@ func (a *API) WalletDefaultAddress(ctx context.Context) (address.Address, error) return addrs[0], nil } -func (a *API) NetConnect(ctx context.Context, p peer.AddrInfo) error { +func (a *FullNodeAPI) NetConnect(ctx context.Context, p peer.AddrInfo) error { return a.Host.Connect(ctx, p) } -func (a *API) NetAddrsListen(context.Context) (peer.AddrInfo, error) { +func (a *FullNodeAPI) NetAddrsListen(context.Context) (peer.AddrInfo, error) { return peer.AddrInfo{ ID: a.Host.ID(), Addrs: a.Host.Addrs(), }, nil } -var _ api.FullNode = &API{} +var _ api.FullNode = &FullNodeAPI{} diff --git a/node/builder.go b/node/builder.go index bc3446cf0..2637a1536 100644 --- a/node/builder.go +++ b/node/builder.go @@ -270,7 +270,7 @@ func Repo(r repo.Repo) Option { func FullAPI(out *api.FullNode) Option { return func(s *Settings) error { - resAPI := &API{} + resAPI := &FullNodeAPI{} s.invokes[ExtractApiKey] = fx.Extract(resAPI) *out = resAPI return nil diff --git a/node/node_test.go b/node/node_test.go index 7480158cc..2a49f6152 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -58,7 +58,7 @@ func rpcBuilder(t *testing.T, n int) []api.FullNode { testServ := httptest.NewServer(rpcServer) // todo: close var err error - out[i], err = client.NewRPC("ws://"+testServ.Listener.Addr().String(), nil) + out[i], err = client.NewFullNodeRPC("ws://"+testServ.Listener.Addr().String(), nil) if err != nil { t.Fatal(err) } From 772dd6c549617d6c1fdb86fbb040c2b343649b40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Jul 2019 02:58:31 +0200 Subject: [PATCH 14/21] Implement storage miner API in node --- api/struct.go | 60 ++++++++++++------------ cmd/lotus-storage-miner/run.go | 8 ++-- node/builder.go | 9 +++- node/commonapi.go | 83 ++++++++++++++++++++++++++++++++++ node/{api.go => fullapi.go} | 69 +--------------------------- node/storminerapi.go | 11 +++++ 6 files changed, 139 insertions(+), 101 deletions(-) create mode 100644 node/commonapi.go rename node/{api.go => fullapi.go} (69%) create mode 100644 node/storminerapi.go diff --git a/api/struct.go b/api/struct.go index dc3153e30..08f03e9f1 100644 --- a/api/struct.go +++ b/api/struct.go @@ -28,36 +28,6 @@ type CommonStruct struct { } } -func (c *CommonStruct) AuthVerify(ctx context.Context, token string) ([]string, error) { - return c.Internal.AuthVerify(ctx, token) -} - -func (c *CommonStruct) AuthNew(ctx context.Context, perms []string) ([]byte, error) { - return c.Internal.AuthNew(ctx, perms) -} - -func (c *CommonStruct) NetPeers(ctx context.Context) ([]peer.AddrInfo, error) { - return c.Internal.NetPeers(ctx) -} - -func (c *CommonStruct) NetConnect(ctx context.Context, p peer.AddrInfo) error { - return c.Internal.NetConnect(ctx, p) -} - -func (c *CommonStruct) NetAddrsListen(ctx context.Context) (peer.AddrInfo, error) { - return c.Internal.NetAddrsListen(ctx) -} - -// ID implements API.ID -func (c *CommonStruct) ID(ctx context.Context) (peer.ID, error) { - return c.Internal.ID(ctx) -} - -// Version implements API.Version -func (c *CommonStruct) Version(ctx context.Context) (Version, error) { - return c.Internal.Version(ctx) -} - // FullNodeStruct implements API passing calls to user-provided function values. type FullNodeStruct struct { CommonStruct @@ -96,6 +66,36 @@ type StorageMinerStruct struct { } } +func (c *CommonStruct) AuthVerify(ctx context.Context, token string) ([]string, error) { + return c.Internal.AuthVerify(ctx, token) +} + +func (c *CommonStruct) AuthNew(ctx context.Context, perms []string) ([]byte, error) { + return c.Internal.AuthNew(ctx, perms) +} + +func (c *CommonStruct) NetPeers(ctx context.Context) ([]peer.AddrInfo, error) { + return c.Internal.NetPeers(ctx) +} + +func (c *CommonStruct) NetConnect(ctx context.Context, p peer.AddrInfo) error { + return c.Internal.NetConnect(ctx, p) +} + +func (c *CommonStruct) NetAddrsListen(ctx context.Context) (peer.AddrInfo, error) { + return c.Internal.NetAddrsListen(ctx) +} + +// ID implements API.ID +func (c *CommonStruct) ID(ctx context.Context) (peer.ID, error) { + return c.Internal.ID(ctx) +} + +// Version implements API.Version +func (c *CommonStruct) Version(ctx context.Context) (Version, error) { + return c.Internal.Version(ctx) +} + func (c *FullNodeStruct) ClientListImports(ctx context.Context) ([]Import, error) { return c.Internal.ClientListImports(ctx) } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 5b25642c7..29396b7d8 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -7,6 +7,7 @@ import ( "golang.org/x/xerrors" "gopkg.in/urfave/cli.v2" + "github.com/filecoin-project/go-lotus/api" lcli "github.com/filecoin-project/go-lotus/cli" "github.com/filecoin-project/go-lotus/lib/jsonrpc" "github.com/filecoin-project/go-lotus/node" @@ -23,13 +24,13 @@ var runCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - api, err := lcli.GetAPI(cctx) + nodeApi, err := lcli.GetAPI(cctx) if err != nil { return err } ctx := lcli.ReqContext(cctx) - v, err := api.Version(ctx) + v, err := nodeApi.Version(ctx) r, err := repo.NewFS(cctx.String(FlagStorageRepo)) if err != nil { @@ -44,8 +45,9 @@ var runCmd = &cli.Command{ return xerrors.Errorf("repo at '%s' is not initialized, run 'lotus-storage-miner init' to set it up", cctx.String(FlagStorageRepo)) } + var minerapi api.StorageMiner err = node.New(ctx, - node.StorageMiner(), + node.StorageMiner(&minerapi), node.Online(), node.Repo(r), diff --git a/node/builder.go b/node/builder.go index 2637a1536..2813a55d1 100644 --- a/node/builder.go +++ b/node/builder.go @@ -207,7 +207,7 @@ func Online() Option { ) } -func StorageMiner() Option { +func StorageMiner(out *api.StorageMiner) Option { return Options( ApplyIf(func(s *Settings) bool { return s.Config }, Error(errors.New("the StorageMiner option must be set before Config option")), @@ -220,6 +220,13 @@ func StorageMiner() Option { s.nodeType = nodeStorageMiner return nil }, + + func(s *Settings) error { + resAPI := &StorageMinerAPI{} + s.invokes[ExtractApiKey] = fx.Extract(resAPI) + *out = resAPI + return nil + }, ) } diff --git a/node/commonapi.go b/node/commonapi.go new file mode 100644 index 000000000..5e3f2ac89 --- /dev/null +++ b/node/commonapi.go @@ -0,0 +1,83 @@ +package node + +import ( + "context" + + "github.com/gbrlsnchs/jwt/v3" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + ma "github.com/multiformats/go-multiaddr" + "go.uber.org/fx" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/build" + "github.com/filecoin-project/go-lotus/node/modules" +) + +type CommonAPI struct { + fx.In + + APISecret *modules.APIAlg + Host host.Host +} + +type jwtPayload struct { + Allow []string +} + +func (a *CommonAPI) AuthVerify(ctx context.Context, token string) ([]string, error) { + var payload jwtPayload + if _, err := jwt.Verify([]byte(token), (*jwt.HMACSHA)(a.APISecret), &payload); err != nil { + return nil, xerrors.Errorf("JWT Verification failed: %w", err) + } + + return payload.Allow, nil +} + +func (a *CommonAPI) AuthNew(ctx context.Context, perms []string) ([]byte, error) { + p := jwtPayload{ + Allow: perms, // TODO: consider checking validity + } + + return jwt.Sign(&p, (*jwt.HMACSHA)(a.APISecret)) +} + +func (a *CommonAPI) NetPeers(context.Context) ([]peer.AddrInfo, error) { + conns := a.Host.Network().Conns() + out := make([]peer.AddrInfo, len(conns)) + + for i, conn := range conns { + out[i] = peer.AddrInfo{ + ID: conn.RemotePeer(), + Addrs: []ma.Multiaddr{ + conn.RemoteMultiaddr(), + }, + } + } + + return out, nil +} + +func (a *CommonAPI) NetConnect(ctx context.Context, p peer.AddrInfo) error { + return a.Host.Connect(ctx, p) +} + +func (a *CommonAPI) NetAddrsListen(context.Context) (peer.AddrInfo, error) { + return peer.AddrInfo{ + ID: a.Host.ID(), + Addrs: a.Host.Addrs(), + }, nil +} + +func (a *CommonAPI) ID(context.Context) (peer.ID, error) { + return a.Host.ID(), nil +} + +func (a *CommonAPI) Version(context.Context) (api.Version, error) { + return api.Version{ + Version: build.Version, + }, nil +} + +var _ api.Common = &CommonAPI{} diff --git a/node/api.go b/node/fullapi.go similarity index 69% rename from node/api.go rename to node/fullapi.go index 3b5d85525..fea0f3689 100644 --- a/node/api.go +++ b/node/fullapi.go @@ -4,22 +4,15 @@ import ( "context" "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/build" "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/miner" "github.com/filecoin-project/go-lotus/node/client" - "github.com/filecoin-project/go-lotus/node/modules" - "github.com/gbrlsnchs/jwt/v3" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" - ma "github.com/multiformats/go-multiaddr" - "golang.org/x/xerrors" ) var log = logging.Logger("node") @@ -27,33 +20,12 @@ var log = logging.Logger("node") type FullNodeAPI struct { client.LocalStorage - Host host.Host + CommonAPI + Chain *chain.ChainStore PubSub *pubsub.PubSub Mpool *chain.MessagePool Wallet *chain.Wallet - APISecret *modules.APIAlg -} - -type jwtPayload struct { - Allow []string -} - -func (a *FullNodeAPI) AuthVerify(ctx context.Context, token string) ([]string, error) { - var payload jwtPayload - if _, err := jwt.Verify([]byte(token), (*jwt.HMACSHA)(a.APISecret), &payload); err != nil { - return nil, xerrors.Errorf("JWT Verification failed: %w", err) - } - - return payload.Allow, nil -} - -func (a *FullNodeAPI) AuthNew(ctx context.Context, perms []string) ([]byte, error) { - p := jwtPayload{ - Allow: perms, // TODO: consider checking validity - } - - return jwt.Sign(&p, (*jwt.HMACSHA)(a.APISecret)) } func (a *FullNodeAPI) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { @@ -96,16 +68,6 @@ func (a *FullNodeAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) ([ return a.Chain.MessagesForBlock(b) } -func (a *FullNodeAPI) ID(context.Context) (peer.ID, error) { - return a.Host.ID(), nil -} - -func (a *FullNodeAPI) Version(context.Context) (api.Version, error) { - return api.Version{ - Version: build.Version, - }, nil -} - func (a *FullNodeAPI) MpoolPending(ctx context.Context, ts *chain.TipSet) ([]*chain.SignedMessage, error) { // TODO: need to make sure we don't return messages that were already included in the referenced chain // also need to accept ts == nil just fine, assume nil == chain.Head() @@ -149,22 +111,6 @@ func (a *FullNodeAPI) MinerCreateBlock(ctx context.Context, addr address.Address return &out, nil } -func (a *FullNodeAPI) NetPeers(context.Context) ([]peer.AddrInfo, error) { - conns := a.Host.Network().Conns() - out := make([]peer.AddrInfo, len(conns)) - - for i, conn := range conns { - out[i] = peer.AddrInfo{ - ID: conn.RemotePeer(), - Addrs: []ma.Multiaddr{ - conn.RemoteMultiaddr(), - }, - } - } - - return out, nil -} - func (a *FullNodeAPI) WalletNew(ctx context.Context, typ string) (address.Address, error) { return a.Wallet.GenerateKey(typ) } @@ -191,15 +137,4 @@ func (a *FullNodeAPI) WalletDefaultAddress(ctx context.Context) (address.Address return addrs[0], nil } -func (a *FullNodeAPI) NetConnect(ctx context.Context, p peer.AddrInfo) error { - return a.Host.Connect(ctx, p) -} - -func (a *FullNodeAPI) NetAddrsListen(context.Context) (peer.AddrInfo, error) { - return peer.AddrInfo{ - ID: a.Host.ID(), - Addrs: a.Host.Addrs(), - }, nil -} - var _ api.FullNode = &FullNodeAPI{} diff --git a/node/storminerapi.go b/node/storminerapi.go new file mode 100644 index 000000000..4108a3e73 --- /dev/null +++ b/node/storminerapi.go @@ -0,0 +1,11 @@ +package node + +import ( + "github.com/filecoin-project/go-lotus/api" +) + +type StorageMinerAPI struct { + CommonAPI +} + +var _ api.StorageMiner = &StorageMinerAPI{} From c6b4fadba1337445bfcb2e85ae43052c702c6d1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Jul 2019 03:10:26 +0200 Subject: [PATCH 15/21] Fix permissions after split and rebase --- api/api.go | 3 --- api/client/client.go | 2 +- api/permissioned.go | 22 ++++++++++++++++------ api/struct.go | 5 ++--- cmd/lotus-storage-miner/run.go | 2 +- cmd/lotus/rpc.go | 4 ++-- node/fullapi.go | 8 ++++---- 7 files changed, 26 insertions(+), 20 deletions(-) diff --git a/api/api.go b/api/api.go index f959cb455..4b308520c 100644 --- a/api/api.go +++ b/api/api.go @@ -54,7 +54,6 @@ type Common interface { // Version provides information about API provider Version(context.Context) (Version, error) - } // FullNode API is a low-level interface to the Filecoin network full node @@ -111,6 +110,4 @@ type FullNode interface { // Full API is a low-level interface to the Filecoin network storage miner node type StorageMiner interface { Common - - } diff --git a/api/client/client.go b/api/client/client.go index d7b862d94..1bf04d6a2 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -29,4 +29,4 @@ func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMine }, requestHeader) return &res, err -} \ No newline at end of file +} diff --git a/api/permissioned.go b/api/permissioned.go index 62603ea79..71c93f8e7 100644 --- a/api/permissioned.go +++ b/api/permissioned.go @@ -27,11 +27,23 @@ func WithPerm(ctx context.Context, perms []string) context.Context { return context.WithValue(ctx, permCtxKey, perms) } -func Permissioned(a FullNode) FullNode { - var out FullNodeStruct +func PermissionedStorMinerAPI(a StorageMiner) StorageMiner { + var out StorageMinerStruct + permissionedAny(a, &out.Internal) + permissionedAny(a, &out.CommonStruct.Internal) + return &out +} - rint := reflect.ValueOf(&out.Internal).Elem() - ra := reflect.ValueOf(a) +func PermissionedFullAPI(a FullNode) FullNode { + var out FullNodeStruct + permissionedAny(a, &out.Internal) + permissionedAny(a, &out.CommonStruct.Internal) + return &out +} + +func permissionedAny(in interface{}, out interface{}) { + rint := reflect.ValueOf(out).Elem() + ra := reflect.ValueOf(in) for f := 0; f < rint.NumField(); f++ { field := rint.Type().Field(f) @@ -81,6 +93,4 @@ func Permissioned(a FullNode) FullNode { })) } - - return &out } diff --git a/api/struct.go b/api/struct.go index 08f03e9f1..d9a629b35 100644 --- a/api/struct.go +++ b/api/struct.go @@ -15,7 +15,7 @@ import ( var _ = AllPermissions type CommonStruct struct { - Internal struct{ + Internal struct { AuthVerify func(ctx context.Context, token string) ([]string, error) `perm:"read"` AuthNew func(ctx context.Context, perms []string) ([]byte, error) `perm:"admin"` @@ -61,8 +61,7 @@ type FullNodeStruct struct { type StorageMinerStruct struct { CommonStruct - Internal struct{ - + Internal struct { } } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 29396b7d8..29f924689 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -68,7 +68,7 @@ var runCmd = &cli.Command{ log.Infof("Remote version %s", v) rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", minerapi) + rpcServer.Register("Filecoin", api.PermissionedStorMinerAPI(minerapi)) http.Handle("/rpc/v0", rpcServer) return http.ListenAndServe("127.0.0.1:"+cctx.String("api"), http.DefaultServeMux) }, diff --git a/cmd/lotus/rpc.go b/cmd/lotus/rpc.go index 6d22e1f85..096b9b750 100644 --- a/cmd/lotus/rpc.go +++ b/cmd/lotus/rpc.go @@ -7,9 +7,9 @@ import ( "github.com/filecoin-project/go-lotus/lib/jsonrpc" ) -func serveRPC(api api.FullNode, addr string) error { +func serveRPC(a api.FullNode, addr string) error { rpcServer := jsonrpc.NewServer() - rpcServer.Register("Filecoin", api) + rpcServer.Register("Filecoin", api.PermissionedFullAPI(a)) http.Handle("/rpc/v0", rpcServer) return http.ListenAndServe(addr, http.DefaultServeMux) } diff --git a/node/fullapi.go b/node/fullapi.go index fea0f3689..5b79c5020 100644 --- a/node/fullapi.go +++ b/node/fullapi.go @@ -22,10 +22,10 @@ type FullNodeAPI struct { CommonAPI - Chain *chain.ChainStore - PubSub *pubsub.PubSub - Mpool *chain.MessagePool - Wallet *chain.Wallet + Chain *chain.ChainStore + PubSub *pubsub.PubSub + Mpool *chain.MessagePool + Wallet *chain.Wallet } func (a *FullNodeAPI) ChainSubmitBlock(ctx context.Context, blk *chain.BlockMsg) error { From 33dc14c9f2669a53075b09d60957bd06451e5c10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Jul 2019 03:13:56 +0200 Subject: [PATCH 16/21] Move API implementations to a separate package --- node/builder.go | 5 +++-- node/{commonapi.go => impl/common.go} | 2 +- node/{fullapi.go => impl/full.go} | 2 +- node/{storminerapi.go => impl/storminer.go} | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) rename node/{commonapi.go => impl/common.go} (99%) rename node/{fullapi.go => impl/full.go} (99%) rename node/{storminerapi.go => impl/storminer.go} (91%) diff --git a/node/builder.go b/node/builder.go index 2813a55d1..0c5326cad 100644 --- a/node/builder.go +++ b/node/builder.go @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/node/config" "github.com/filecoin-project/go-lotus/node/hello" + "github.com/filecoin-project/go-lotus/node/impl" "github.com/filecoin-project/go-lotus/node/modules" "github.com/filecoin-project/go-lotus/node/modules/helpers" "github.com/filecoin-project/go-lotus/node/modules/lp2p" @@ -222,7 +223,7 @@ func StorageMiner(out *api.StorageMiner) Option { }, func(s *Settings) error { - resAPI := &StorageMinerAPI{} + resAPI := &impl.StorageMinerAPI{} s.invokes[ExtractApiKey] = fx.Extract(resAPI) *out = resAPI return nil @@ -277,7 +278,7 @@ func Repo(r repo.Repo) Option { func FullAPI(out *api.FullNode) Option { return func(s *Settings) error { - resAPI := &FullNodeAPI{} + resAPI := &impl.FullNodeAPI{} s.invokes[ExtractApiKey] = fx.Extract(resAPI) *out = resAPI return nil diff --git a/node/commonapi.go b/node/impl/common.go similarity index 99% rename from node/commonapi.go rename to node/impl/common.go index 5e3f2ac89..9be1915ae 100644 --- a/node/commonapi.go +++ b/node/impl/common.go @@ -1,4 +1,4 @@ -package node +package impl import ( "context" diff --git a/node/fullapi.go b/node/impl/full.go similarity index 99% rename from node/fullapi.go rename to node/impl/full.go index 5b79c5020..94f6249e7 100644 --- a/node/fullapi.go +++ b/node/impl/full.go @@ -1,4 +1,4 @@ -package node +package impl import ( "context" diff --git a/node/storminerapi.go b/node/impl/storminer.go similarity index 91% rename from node/storminerapi.go rename to node/impl/storminer.go index 4108a3e73..197ca710a 100644 --- a/node/storminerapi.go +++ b/node/impl/storminer.go @@ -1,4 +1,4 @@ -package node +package impl import ( "github.com/filecoin-project/go-lotus/api" From 81d7c30facff0ba5e24cdcadc619d1ab03b1135f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Jul 2019 03:16:17 +0200 Subject: [PATCH 17/21] Couple lint fixes --- lib/jsonrpc/server.go | 3 ++- node/modules/core.go | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/jsonrpc/server.go b/lib/jsonrpc/server.go index 009710f85..c6b21a3da 100644 --- a/lib/jsonrpc/server.go +++ b/lib/jsonrpc/server.go @@ -3,9 +3,10 @@ package jsonrpc import ( "context" "encoding/json" - "github.com/gorilla/websocket" "io" "net/http" + + "github.com/gorilla/websocket" ) const ( diff --git a/node/modules/core.go b/node/modules/core.go index 47106019a..18ccf453d 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -3,13 +3,11 @@ package modules import ( "context" "crypto/rand" - "github.com/filecoin-project/go-lotus/api" - "github.com/gbrlsnchs/jwt/v3" - "golang.org/x/xerrors" "io" "io/ioutil" "path/filepath" + "github.com/gbrlsnchs/jwt/v3" "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-blockservice" @@ -27,7 +25,9 @@ import ( "github.com/libp2p/go-libp2p-core/routing" record "github.com/libp2p/go-libp2p-record" "go.uber.org/fx" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/types" "github.com/filecoin-project/go-lotus/node/modules/helpers" From 4fa9e45eb7cdcdf65a8aaf1fe80ee3ef37fca4f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Jul 2019 13:20:00 +0200 Subject: [PATCH 18/21] repo: check for config in exists --- cmd/lotus-storage-miner/run.go | 3 +++ node/repo/fsrepo.go | 10 +++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 29f924689..3368c8f78 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -31,6 +31,9 @@ var runCmd = &cli.Command{ ctx := lcli.ReqContext(cctx) v, err := nodeApi.Version(ctx) + if err != nil { + return err + } r, err := repo.NewFS(cctx.String(FlagStorageRepo)) if err != nil { diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index a506e6e8e..2ce0c0810 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -59,7 +59,7 @@ func NewFS(path string) (*FsRepo, error) { } func (fsr *FsRepo) Exists() (bool, error) { - _, err := os.Stat(fsr.path) + _, err := os.Stat(filepath.Join(fsr.path, fsConfig)) notexist := os.IsNotExist(err) if notexist { err = nil @@ -79,6 +79,14 @@ func (fsr *FsRepo) Init() error { if err != nil { return err } + c, err := os.Create(filepath.Join(fsr.path, fsConfig)) + if err != nil { + return err + } + if err := c.Close(); err != nil { + return err + } + return fsr.initKeystore() } From 77bc2431100f25dbfecb4464f8dac5e5d247d682 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Jul 2019 14:20:57 +0200 Subject: [PATCH 19/21] jsonrpc: Break NewMergeClient into smaller functions --- lib/jsonrpc/client.go | 357 ++++++++++++++++++++++++------------------ 1 file changed, 202 insertions(+), 155 deletions(-) diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index c0efa7337..448418c67 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -3,7 +3,6 @@ package jsonrpc import ( "context" "encoding/json" - "errors" "fmt" "net/http" "reflect" @@ -42,12 +41,14 @@ type clientResponse struct { Error *respError `json:"error,omitempty"` } +type makeChanSink func() (context.Context, func([]byte, bool)) + type clientRequest struct { req request ready chan clientResponse // retCh provides a context and sink for handling incoming channel messages - retCh func() (context.Context, func([]byte, bool)) + retCh makeChanSink } // ClientCloser is used to close Client from further use @@ -62,24 +63,33 @@ func NewClient(addr string, namespace string, handler interface{}, requestHeader return NewMergeClient(addr, namespace, []interface{}{handler}, requestHeader) } +type client struct { + namespace string + + requests chan clientRequest + idCtr int64 +} + // NewMergeClient is like NewClient, but allows to specify multiple structs // to be filled in the same namespace, using one connection func NewMergeClient(addr string, namespace string, outs []interface{}, requestHeader http.Header) (ClientCloser, error) { - var idCtr int64 - conn, _, err := websocket.DefaultDialer.Dial(addr, requestHeader) if err != nil { return nil, err } + c := client{ + namespace: namespace, + } + stop := make(chan struct{}) - requests := make(chan clientRequest) + c.requests = make(chan clientRequest) handlers := map[string]rpcHandler{} go (&wsConn{ conn: conn, handler: handlers, - requests: requests, + requests: c.requests, stop: stop, }).handleWsConn(context.TODO()) @@ -96,157 +106,11 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe val := reflect.ValueOf(handler) for i := 0; i < typ.NumField(); i++ { - f := typ.Field(i) - ftyp := f.Type - if ftyp.Kind() != reflect.Func { - return nil, xerrors.New("handler field not a func") + fn, err := c.makeRpcFunc(typ.Field(i)) + if err != nil { + return nil, err } - valOut, errOut, nout := processFuncOut(ftyp) - - processResponse := func(resp clientResponse, rval reflect.Value) []reflect.Value { - out := make([]reflect.Value, nout) - - if valOut != -1 { - out[valOut] = rval - } - if errOut != -1 { - out[errOut] = reflect.New(errorType).Elem() - if resp.Error != nil { - out[errOut].Set(reflect.ValueOf(resp.Error)) - } - } - - return out - } - - processError := func(err error) []reflect.Value { - out := make([]reflect.Value, nout) - - if valOut != -1 { - out[valOut] = reflect.New(ftyp.Out(valOut)).Elem() - } - if errOut != -1 { - out[errOut] = reflect.New(errorType).Elem() - out[errOut].Set(reflect.ValueOf(&ErrClient{err})) - } - - return out - } - - hasCtx := 0 - if ftyp.NumIn() > 0 && ftyp.In(0) == contextType { - hasCtx = 1 - } - retCh := valOut != -1 && ftyp.Out(valOut).Kind() == reflect.Chan - - fn := reflect.MakeFunc(ftyp, func(args []reflect.Value) (results []reflect.Value) { - id := atomic.AddInt64(&idCtr, 1) - params := make([]param, len(args)-hasCtx) - for i, arg := range args[hasCtx:] { - params[i] = param{ - v: arg, - } - } - - var ctx context.Context - if hasCtx == 1 { - ctx = args[0].Interface().(context.Context) - } - - var retVal reflect.Value - - // if the function returns a channel, we need to provide a sink for the - // messages - var chCtor func() (context.Context, func([]byte, bool)) - - if retCh { - retVal = reflect.Zero(ftyp.Out(valOut)) - - chCtor = func() (context.Context, func([]byte, bool)) { - // unpack chan type to make sure it's reflect.BothDir - ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) - ch := reflect.MakeChan(ctyp, 0) // todo: buffer? - retVal = ch.Convert(ftyp.Out(valOut)) - - return ctx, func(result []byte, ok bool) { - if !ok { - // remote channel closed, close ours too - ch.Close() - return - } - - val := reflect.New(ftyp.Out(valOut).Elem()) - if err := json.Unmarshal(result, val.Interface()); err != nil { - log.Errorf("error unmarshaling chan response: %s", err) - return - } - - ch.Send(val.Elem()) // todo: select on ctx is probably a good idea - } - } - } - - req := request{ - Jsonrpc: "2.0", - ID: &id, - Method: namespace + "." + f.Name, - Params: params, - } - - rchan := make(chan clientResponse, 1) - requests <- clientRequest{ - req: req, - ready: rchan, - - retCh: chCtor, - } - var ctxDone <-chan struct{} - var resp clientResponse - - if ctx != nil { - ctxDone = ctx.Done() - } - - // wait for response, handle context cancellation - loop: - for { - select { - case resp = <-rchan: - break loop - case <-ctxDone: // send cancel request - ctxDone = nil - - requests <- clientRequest{ - req: request{ - Jsonrpc: "2.0", - Method: wsCancel, - Params: []param{{v: reflect.ValueOf(id)}}, - }, - } - } - } - - if valOut != -1 && !retCh { - retVal = reflect.New(ftyp.Out(valOut)) - - if resp.Result != nil { - log.Debugw("rpc result", "type", ftyp.Out(valOut)) - if err := json.Unmarshal(resp.Result, retVal.Interface()); err != nil { - return processError(xerrors.Errorf("unmarshaling result: %w", err)) - } - } - - retVal = retVal.Elem() - } - - if resp.ID != *req.ID { - return processError(errors.New("request and response id didn't match")) - } - - return processResponse(resp, retVal) - }) - val.Elem().Field(i).Set(fn) } } @@ -255,3 +119,186 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe close(stop) }, nil } + +func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) (func() reflect.Value, makeChanSink) { + retVal := reflect.Zero(ftyp.Out(valOut)) + + chCtor := func() (context.Context, func([]byte, bool)) { + // unpack chan type to make sure it's reflect.BothDir + ctyp := reflect.ChanOf(reflect.BothDir, ftyp.Out(valOut).Elem()) + ch := reflect.MakeChan(ctyp, 0) // todo: buffer? + retVal = ch.Convert(ftyp.Out(valOut)) + + return ctx, func(result []byte, ok bool) { + if !ok { + // remote channel closed, close ours too + ch.Close() + return + } + + val := reflect.New(ftyp.Out(valOut).Elem()) + if err := json.Unmarshal(result, val.Interface()); err != nil { + log.Errorf("error unmarshaling chan response: %s", err) + return + } + + ch.Send(val.Elem()) // todo: select on ctx is probably a good idea + } + } + + return func() reflect.Value { return retVal }, chCtor +} + +func (c *client) sendRequest(ctx context.Context, req request, chCtor makeChanSink) clientResponse { + rchan := make(chan clientResponse, 1) + c.requests <- clientRequest{ + req: req, + ready: rchan, + + retCh: chCtor, + } + var ctxDone <-chan struct{} + var resp clientResponse + + if ctx != nil { + ctxDone = ctx.Done() + } + + // wait for response, handle context cancellation +loop: + for { + select { + case resp = <-rchan: + break loop + case <-ctxDone: // send cancel request + ctxDone = nil + + c.requests <- clientRequest{ + req: request{ + Jsonrpc: "2.0", + Method: wsCancel, + Params: []param{{v: reflect.ValueOf(*req.ID)}}, + }, + } + } + } + + return resp +} + +type rpcFunc struct { + client *client + + ftyp reflect.Type + name string + + nout int + valOut int + errOut int + + hasCtx int + retCh bool +} + +func (fn *rpcFunc) processResponse(resp clientResponse, rval reflect.Value) []reflect.Value { + out := make([]reflect.Value, fn.nout) + + if fn.valOut != -1 { + out[fn.valOut] = rval + } + if fn.errOut != -1 { + out[fn.errOut] = reflect.New(errorType).Elem() + if resp.Error != nil { + out[fn.errOut].Set(reflect.ValueOf(resp.Error)) + } + } + + return out +} + +func (fn *rpcFunc) processError(err error) []reflect.Value { + out := make([]reflect.Value, fn.nout) + + if fn.valOut != -1 { + out[fn.valOut] = reflect.New(fn.ftyp.Out(fn.valOut)).Elem() + } + if fn.errOut != -1 { + out[fn.errOut] = reflect.New(errorType).Elem() + out[fn.errOut].Set(reflect.ValueOf(&ErrClient{err})) + } + + return out +} + +func (fn *rpcFunc) handleRpcCall(args []reflect.Value) (results []reflect.Value) { + id := atomic.AddInt64(&fn.client.idCtr, 1) + params := make([]param, len(args)-fn.hasCtx) + for i, arg := range args[fn.hasCtx:] { + params[i] = param{ + v: arg, + } + } + + var ctx context.Context + if fn.hasCtx == 1 { + ctx = args[0].Interface().(context.Context) + } + + retVal := func() reflect.Value { return reflect.Value{} } + + // if the function returns a channel, we need to provide a sink for the + // messages + var chCtor makeChanSink + if fn.retCh { + retVal, chCtor = fn.client.makeOutChan(ctx, fn.ftyp, fn.valOut) + } + + req := request{ + Jsonrpc: "2.0", + ID: &id, + Method: fn.client.namespace + "." + fn.name, + Params: params, + } + + resp := fn.client.sendRequest(ctx, req, chCtor) + + if resp.ID != *req.ID { + return fn.processError(xerrors.New("request and response id didn't match")) + } + + if fn.valOut != -1 && !fn.retCh { + val := reflect.New(fn.ftyp.Out(fn.valOut)) + + if resp.Result != nil { + log.Debugw("rpc result", "type", fn.ftyp.Out(fn.valOut)) + if err := json.Unmarshal(resp.Result, val.Interface()); err != nil { + return fn.processError(xerrors.Errorf("unmarshaling result: %w", err)) + } + } + + retVal = func() reflect.Value { return val.Elem() } + } + + return fn.processResponse(resp, retVal()) +} + +func (c *client) makeRpcFunc(f reflect.StructField) (reflect.Value, error) { + ftyp := f.Type + if ftyp.Kind() != reflect.Func { + return reflect.Value{}, xerrors.New("handler field not a func") + } + + fun := &rpcFunc{ + client: c, + ftyp: ftyp, + name: f.Name, + } + fun.valOut, fun.errOut, fun.nout = processFuncOut(ftyp) + + if ftyp.NumIn() > 0 && ftyp.In(0) == contextType { + fun.hasCtx = 1 + } + fun.retCh = fun.valOut != -1 && ftyp.Out(fun.valOut).Kind() == reflect.Chan + + return reflect.MakeFunc(ftyp, fun.handleRpcCall), nil +} From c7b2bf8100a49f40d000ea1da5a8296f1272dfb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Jul 2019 15:40:42 +0200 Subject: [PATCH 20/21] Fix reading auth headers --- cmd/lotus/rpc.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cmd/lotus/rpc.go b/cmd/lotus/rpc.go index 096b9b750..a0be6ecbe 100644 --- a/cmd/lotus/rpc.go +++ b/cmd/lotus/rpc.go @@ -4,12 +4,19 @@ import ( "net/http" "github.com/filecoin-project/go-lotus/api" + "github.com/filecoin-project/go-lotus/lib/auth" "github.com/filecoin-project/go-lotus/lib/jsonrpc" ) func serveRPC(a api.FullNode, addr string) error { rpcServer := jsonrpc.NewServer() rpcServer.Register("Filecoin", api.PermissionedFullAPI(a)) - http.Handle("/rpc/v0", rpcServer) + + ah := &auth.Handler{ + Verify: a.AuthVerify, + Next: rpcServer.ServeHTTP, + } + + http.Handle("/rpc/v0", ah) return http.ListenAndServe(addr, http.DefaultServeMux) } From 93a8ee11dba98d8a5ddfffaf3b4e1698d7477e1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 24 Jul 2019 19:09:00 +0200 Subject: [PATCH 21/21] jsonrpc: Work with browsers --- cmd/lotus-storage-miner/run.go | 9 ++++++++- lib/jsonrpc/server.go | 17 +++++++++++++++-- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 3368c8f78..1cc209532 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-lotus/api" lcli "github.com/filecoin-project/go-lotus/cli" + "github.com/filecoin-project/go-lotus/lib/auth" "github.com/filecoin-project/go-lotus/lib/jsonrpc" "github.com/filecoin-project/go-lotus/node" "github.com/filecoin-project/go-lotus/node/repo" @@ -72,7 +73,13 @@ var runCmd = &cli.Command{ rpcServer := jsonrpc.NewServer() rpcServer.Register("Filecoin", api.PermissionedStorMinerAPI(minerapi)) - http.Handle("/rpc/v0", rpcServer) + + ah := &auth.Handler{ + Verify: minerapi.AuthVerify, + Next: rpcServer.ServeHTTP, + } + + http.Handle("/rpc/v0", ah) return http.ListenAndServe("127.0.0.1:"+cctx.String("api"), http.DefaultServeMux) }, } diff --git a/lib/jsonrpc/server.go b/lib/jsonrpc/server.go index c6b21a3da..18d2da48f 100644 --- a/lib/jsonrpc/server.go +++ b/lib/jsonrpc/server.go @@ -5,6 +5,7 @@ import ( "encoding/json" "io" "net/http" + "strings" "github.com/gorilla/websocket" ) @@ -27,9 +28,21 @@ func NewServer() *RPCServer { } } -var upgrader = websocket.Upgrader{} +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} func (s *RPCServer) handleWS(ctx context.Context, w http.ResponseWriter, r *http.Request) { + // TODO: allow setting + // (note that we still are mostly covered by jwt tokens) + w.Header().Set("Access-Control-Allow-Origin", "*") + if r.Header.Get("Sec-WebSocket-Protocol") != "" { + w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol")) + } + + c, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Error(err) @@ -52,7 +65,7 @@ func (s *RPCServer) handleWS(ctx context.Context, w http.ResponseWriter, r *http func (s *RPCServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - if r.Header.Get("Connection") == "Upgrade" { + if strings.Contains(r.Header.Get("Connection"), "Upgrade") { s.handleWS(ctx, w, r) return }