pond: subcommands to run cmds on nodes

This commit is contained in:
Łukasz Magiera 2019-08-13 01:09:08 +02:00
parent 5ab1b1caaf
commit 94044b40a2
4 changed files with 294 additions and 173 deletions

View File

@ -75,7 +75,7 @@ Pond is a graphical testbed for lotus. It can be used to spin up nodes, connect
them in a given topology, start them mining, and observe how they function over them in a given topology, start them mining, and observe how they function over
time. time.
To try it out, run `make pond`, then run the `pond` binary that gets created. To try it out, run `make pond`, then run `./pond run`.
Once it is running, visit localhost:2222 in your browser. Once it is running, visit localhost:2222 in your browser.
## Tracing ## Tracing

View File

@ -117,7 +117,7 @@ func getInclusionProof(ref string, status sectorbuilder.SectorSealingStatus) (Pi
return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID) return PieceInclusionProof{}, xerrors.Errorf("pieceInclusionProof for %s in sector %d not found", ref, status.SectorID)
} }
func (h *Handler) pollSectorSealed(deal MinerDeal) (status sectorbuilder.SectorSealingStatus, err error){ func (h *Handler) pollSectorSealed(deal MinerDeal) (status sectorbuilder.SectorSealingStatus, err error) {
loop: loop:
for { for {
status, err = h.sb.SealStatus(deal.SectorID) status, err = h.sb.SealStatus(deal.SectorID)

203
lotuspond/api.go Normal file
View File

@ -0,0 +1,203 @@
package main
import (
"fmt"
"github.com/filecoin-project/go-lotus/lib/jsonrpc"
"io"
"io/ioutil"
"os"
"os/exec"
"sync"
"sync/atomic"
"time"
"github.com/filecoin-project/go-lotus/node/repo"
"golang.org/x/xerrors"
)
type api struct {
cmds int32
running map[int32]runningNode
runningLk sync.Mutex
genesis string
}
type nodeInfo struct {
Repo string
ID int32
ApiPort int32
FullNode string // only for storage nodes
Storage bool
}
func (api *api) Spawn() (nodeInfo, error) {
dir, err := ioutil.TempDir(os.TempDir(), "lotus-")
if err != nil {
return nodeInfo{}, err
}
genParam := "--genesis=" + api.genesis
id := atomic.AddInt32(&api.cmds, 1)
if id == 1 {
// make genesis
genf, err := ioutil.TempFile(os.TempDir(), "lotus-genesis-")
if err != nil {
return nodeInfo{}, err
}
api.genesis = genf.Name()
genParam = "--lotus-make-random-genesis=" + api.genesis
if err := genf.Close(); err != nil {
return nodeInfo{}, err
}
}
errlogfile, err := os.OpenFile(dir+".err.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
logfile, err := os.OpenFile(dir+".out.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
cmd := exec.Command("./lotus", "daemon", genParam, "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile)
cmd.Env = []string{"LOTUS_PATH=" + dir}
if err := cmd.Start(); err != nil {
return nodeInfo{}, err
}
info := nodeInfo{
Repo: dir,
ID: id,
ApiPort: 2500 + id,
}
api.runningLk.Lock()
api.running[id] = runningNode{
cmd: cmd,
meta: info,
stop: func() {
defer errlogfile.Close()
defer logfile.Close()
},
}
api.runningLk.Unlock()
time.Sleep(time.Millisecond * 750) // TODO: Something less terrible
return info, nil
}
func (api *api) Nodes() []nodeInfo {
api.runningLk.Lock()
out := make([]nodeInfo, 0, len(api.running))
for _, node := range api.running {
out = append(out, node.meta)
}
api.runningLk.Unlock()
return out
}
func (api *api) TokenFor(id int32) (string, error) {
api.runningLk.Lock()
defer api.runningLk.Unlock()
rnd, ok := api.running[id]
if !ok {
return "", xerrors.New("no running node with this ID")
}
r, err := repo.NewFS(rnd.meta.Repo)
if err != nil {
return "", err
}
t, err := r.APIToken()
if err != nil {
return "", err
}
return string(t), nil
}
func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) {
dir, err := ioutil.TempDir(os.TempDir(), "lotus-storage-")
if err != nil {
return nodeInfo{}, err
}
errlogfile, err := os.OpenFile(dir+".err.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
logfile, err := os.OpenFile(dir+".out.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
id := atomic.AddInt32(&api.cmds, 1)
cmd := exec.Command("./lotus-storage-miner", "init")
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile)
cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo}
if err := cmd.Run(); err != nil {
return nodeInfo{}, err
}
time.Sleep(time.Millisecond * 300)
cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile)
cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo}
if err := cmd.Start(); err != nil {
return nodeInfo{}, err
}
info := nodeInfo{
Repo: dir,
ID: id,
ApiPort: 2500 + id,
FullNode: fullNodeRepo,
Storage: true,
}
api.runningLk.Lock()
api.running[id] = runningNode{
cmd: cmd,
meta: info,
stop: func() {
defer errlogfile.Close()
defer logfile.Close()
},
}
api.runningLk.Unlock()
time.Sleep(time.Millisecond * 750) // TODO: Something less terrible
return info, nil
}
type client struct {
Nodes func() []nodeInfo
}
func apiClient() (*client, error) {
c := &client{}
if _, err := jsonrpc.NewClient("ws://"+listenAddr+"/rpc/v0", "Pond", c, nil); err != nil {
return nil, err
}
return c, nil
}

View File

@ -2,19 +2,14 @@ package main
import ( import (
"fmt" "fmt"
"io"
"io/ioutil"
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
"sync" "strconv"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/filecoin-project/go-lotus/lib/jsonrpc" "github.com/filecoin-project/go-lotus/lib/jsonrpc"
"github.com/filecoin-project/go-lotus/node/repo"
"gopkg.in/urfave/cli.v2"
) )
const listenAddr = "127.0.0.1:2222" const listenAddr = "127.0.0.1:2222"
@ -26,179 +21,87 @@ type runningNode struct {
stop func() stop func()
} }
type api struct { var onCmd = &cli.Command{
cmds int32 Name: "on",
running map[int32]runningNode Usage: "run a command on a given node",
runningLk sync.Mutex Action: func(cctx *cli.Context) error {
genesis string client, err := apiClient()
}
type nodeInfo struct {
Repo string
ID int32
ApiPort int32
Storage bool
}
func (api *api) Spawn() (nodeInfo, error) {
dir, err := ioutil.TempDir(os.TempDir(), "lotus-")
if err != nil { if err != nil {
return nodeInfo{}, err return err
} }
genParam := "--genesis=" + api.genesis nd, err := strconv.ParseInt(cctx.Args().Get(0), 10, 32)
id := atomic.AddInt32(&api.cmds, 1)
if id == 1 {
// make genesis
genf, err := ioutil.TempFile(os.TempDir(), "lotus-genesis-")
if err != nil { if err != nil {
return nodeInfo{}, err return err
} }
api.genesis = genf.Name() node := client.Nodes()[nd-1] // -1 to match numbers in UI
genParam = "--lotus-make-random-genesis=" + api.genesis var cmd *exec.Cmd
if !node.Storage {
if err := genf.Close(); err != nil { cmd = exec.Command("./lotus", cctx.Args().Slice()[1:]...)
return nodeInfo{}, err cmd.Env = []string{
"LOTUS_PATH=" + node.Repo,
}
} else {
cmd = exec.Command("./lotus-storage-miner")
cmd.Env = []string{
"LOTUS_STORAGE_PATH=" + node.Repo,
"LOTUS_PATH=" + node.FullNode,
}
} }
} cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
errlogfile, err := os.OpenFile(dir+".err.log", os.O_CREATE|os.O_WRONLY, 0644) err = cmd.Run()
if err != nil { return err
return nodeInfo{}, err
}
logfile, err := os.OpenFile(dir+".out.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
cmd := exec.Command("./lotus", "daemon", genParam, "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile)
cmd.Env = []string{"LOTUS_PATH=" + dir}
if err := cmd.Start(); err != nil {
return nodeInfo{}, err
}
info := nodeInfo{
Repo: dir,
ID: id,
ApiPort: 2500 + id,
}
api.runningLk.Lock()
api.running[id] = runningNode{
cmd: cmd,
meta: info,
stop: func() {
defer errlogfile.Close()
defer logfile.Close()
}, },
}
api.runningLk.Unlock()
time.Sleep(time.Millisecond * 750) // TODO: Something less terrible
return info, nil
} }
func (api *api) Nodes() []nodeInfo { var shCmd = &cli.Command{
api.runningLk.Lock() Name: "sh",
out := make([]nodeInfo, 0, len(api.running)) Usage: "spawn shell with node shell variables set",
for _, node := range api.running { Action: func(cctx *cli.Context) error {
out = append(out, node.meta) client, err := apiClient()
}
api.runningLk.Unlock()
return out
}
func (api *api) TokenFor(id int32) (string, error) {
api.runningLk.Lock()
defer api.runningLk.Unlock()
rnd, ok := api.running[id]
if !ok {
return "", errors.New("no running node with this ID")
}
r, err := repo.NewFS(rnd.meta.Repo)
if err != nil { if err != nil {
return "", err return err
} }
t, err := r.APIToken() nd, err := strconv.ParseInt(cctx.Args().Get(0), 10, 32)
if err != nil { if err != nil {
return "", err return err
} }
return string(t), nil node := client.Nodes()[nd-1] // -1 to match numbers in UI
} shcmd := exec.Command("/bin/bash")
if !node.Storage {
func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) { shcmd.Env = []string{
dir, err := ioutil.TempDir(os.TempDir(), "lotus-storage-") "LOTUS_PATH=" + node.Repo,
if err != nil { }
return nodeInfo{}, err } else {
shcmd.Env = []string{
"LOTUS_STORAGE_PATH=" + node.Repo,
"LOTUS_PATH=" + node.FullNode,
}
} }
errlogfile, err := os.OpenFile(dir+".err.log", os.O_CREATE|os.O_WRONLY, 0644) shcmd.Stdin = os.Stdin
if err != nil { shcmd.Stdout = os.Stdout
return nodeInfo{}, err shcmd.Stderr = os.Stderr
}
logfile, err := os.OpenFile(dir+".out.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
id := atomic.AddInt32(&api.cmds, 1) fmt.Printf("Entering shell for Node %d\n", nd)
cmd := exec.Command("./lotus-storage-miner", "init") err = shcmd.Run()
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile) fmt.Printf("Closed pond shell\n")
cmd.Stdout = io.MultiWriter(os.Stdout, logfile)
cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo}
if err := cmd.Run(); err != nil {
return nodeInfo{}, err
}
time.Sleep(time.Millisecond * 300) return err
cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile)
cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo}
if err := cmd.Start(); err != nil {
return nodeInfo{}, err
}
info := nodeInfo{
Repo: dir,
ID: id,
ApiPort: 2500 + id,
Storage: true,
}
api.runningLk.Lock()
api.running[id] = runningNode{
cmd: cmd,
meta: info,
stop: func() {
defer errlogfile.Close()
defer logfile.Close()
}, },
}
api.runningLk.Unlock()
time.Sleep(time.Millisecond * 750) // TODO: Something less terrible
return info, nil
} }
func main() { var runCmd = &cli.Command{
Name: "run",
Usage: "run lotuspond daemon",
Action: func(cctx *cli.Context) error {
rpcServer := jsonrpc.NewServer() rpcServer := jsonrpc.NewServer()
rpcServer.Register("Pond", &api{running: map[int32]runningNode{}}) rpcServer.Register("Pond", &api{running: map[int32]runningNode{}})
@ -206,5 +109,20 @@ func main() {
http.Handle("/rpc/v0", rpcServer) http.Handle("/rpc/v0", rpcServer)
fmt.Printf("Listening on http://%s\n", listenAddr) fmt.Printf("Listening on http://%s\n", listenAddr)
http.ListenAndServe(listenAddr, nil) return http.ListenAndServe(listenAddr, nil)
},
}
func main() {
app := &cli.App{
Name: "pond",
Commands: []*cli.Command{
runCmd,
shCmd,
onCmd,
},
}
if err := app.Run(os.Args); err != nil {
panic(err)
}
} }