Merge pull request #201 from filecoin-project/feat/pond-stopstart

Graceful node shutdown, pond node stopping
This commit is contained in:
Łukasz Magiera 2019-09-17 20:45:18 +02:00 committed by GitHub
commit b33c1e1310
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 391 additions and 199 deletions

View File

@ -124,7 +124,7 @@ func (h *Handler) Run(ctx context.Context) {
// TODO: restore state // TODO: restore state
go func() { go func() {
defer log.Error("quitting deal handler loop") defer log.Warn("quitting deal handler loop")
defer close(h.stopped) defer close(h.stopped)
for { for {

View File

@ -2,8 +2,6 @@ package sub
import ( import (
"context" "context"
"fmt"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -17,6 +15,10 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
for { for {
msg, err := bsub.Next(ctx) msg, err := bsub.Next(ctx)
if err != nil { if err != nil {
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingBlocks loop")
return
}
log.Error("error from block subscription: ", err) log.Error("error from block subscription: ", err)
continue continue
} }
@ -55,7 +57,11 @@ func HandleIncomingMessages(ctx context.Context, mpool *chain.MessagePool, msub
for { for {
msg, err := msub.Next(ctx) msg, err := msub.Next(ctx)
if err != nil { if err != nil {
fmt.Println("error from message subscription: ", err) log.Warn("error from message subscription: ", err)
if ctx.Err() != nil {
log.Warn("quitting HandleIncomingMessages loop")
return
}
continue continue
} }

View File

@ -127,7 +127,8 @@ func (tu *syncTestUtil) addSourceNode(gen int) {
sourceRepo, genesis, blocks := tu.repoWithChain(tu.t, gen) sourceRepo, genesis, blocks := tu.repoWithChain(tu.t, gen)
var out api.FullNode var out api.FullNode
err := node.New(tu.ctx, // TODO: Don't ignore stop
_, err := node.New(tu.ctx,
node.FullAPI(&out), node.FullAPI(&out),
node.Online(), node.Online(),
node.Repo(sourceRepo), node.Repo(sourceRepo),
@ -149,7 +150,8 @@ func (tu *syncTestUtil) addClientNode() int {
var out api.FullNode var out api.FullNode
err := node.New(tu.ctx, // TODO: Don't ignore stop
_, err := node.New(tu.ctx,
node.FullAPI(&out), node.FullAPI(&out),
node.Online(), node.Online(),
node.Repo(repo.NewMemory(nil)), node.Repo(repo.NewMemory(nil)),

View File

@ -21,7 +21,6 @@ var log = logging.Logger("cli")
const ( const (
metadataTraceConetxt = "traceContext" metadataTraceConetxt = "traceContext"
metadataContext = "context"
) )
// ApiConnector returns API instance // ApiConnector returns API instance
@ -85,22 +84,19 @@ func GetStorageMinerAPI(ctx *cli.Context) (api.StorageMiner, error) {
return client.NewStorageMinerRPC(addr, headers) return client.NewStorageMinerRPC(addr, headers)
} }
func DaemonContext(cctx *cli.Context) context.Context {
if mtCtx, ok := cctx.App.Metadata[metadataTraceConetxt]; ok {
return mtCtx.(context.Context)
}
return context.Background()
}
// ReqContext returns context for cli execution. Calling it for the first time // ReqContext returns context for cli execution. Calling it for the first time
// installs SIGTERM handler that will close returned context. // installs SIGTERM handler that will close returned context.
// Not safe for concurrent execution. // Not safe for concurrent execution.
func ReqContext(cctx *cli.Context) context.Context { func ReqContext(cctx *cli.Context) context.Context {
if uctx, ok := cctx.App.Metadata[metadataContext]; ok { tCtx := DaemonContext(cctx)
// unchecked cast as if something else is in there
// it is crash worthy either way
return uctx.(context.Context)
}
var tCtx context.Context
if mtCtx, ok := cctx.App.Metadata[metadataTraceConetxt]; ok {
tCtx = mtCtx.(context.Context)
} else {
tCtx = context.Background()
}
ctx, done := context.WithCancel(tCtx) ctx, done := context.WithCancel(tCtx)
sigChan := make(chan os.Signal, 2) sigChan := make(chan os.Signal, 2)

View File

@ -67,7 +67,7 @@ func main() {
} }
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {
log.Error(err) log.Warn(err)
return return
} }
} }

View File

@ -1,8 +1,11 @@
package main package main
import ( import (
"context"
"net/http" "net/http"
"os" "os"
"os/signal"
"syscall"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -32,14 +35,7 @@ var runCmd = &cli.Command{
if err != nil { if err != nil {
return err return err
} }
ctx := lcli.ReqContext(cctx) ctx := lcli.DaemonContext(cctx)
go func() {
// a hack for now to handle sigint
<-ctx.Done()
os.Exit(0)
}()
v, err := nodeApi.Version(ctx) v, err := nodeApi.Version(ctx)
if err != nil { if err != nil {
@ -61,7 +57,7 @@ var runCmd = &cli.Command{
} }
var minerapi api.StorageMiner var minerapi api.StorageMiner
err = node.New(ctx, stop, err := node.New(ctx,
node.StorageMiner(&minerapi), node.StorageMiner(&minerapi),
node.Online(), node.Online(),
node.Repo(r), node.Repo(r),
@ -101,6 +97,23 @@ var runCmd = &cli.Command{
} }
http.Handle("/rpc/v0", ah) http.Handle("/rpc/v0", ah)
return http.ListenAndServe("127.0.0.1:"+cctx.String("api"), http.DefaultServeMux)
srv := &http.Server{Addr: "127.0.0.1:" + cctx.String("api"), Handler: http.DefaultServeMux}
sigChan := make(chan os.Signal, 2)
go func() {
<-sigChan
log.Warn("Shutting down..")
if err := stop(context.TODO()); err != nil {
log.Errorf("graceful shutting down failed: %s", err)
}
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
log.Warn("Graceful shutdown successful")
}()
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
return srv.ListenAndServe()
}, },
} }

View File

@ -6,14 +6,13 @@ import (
"context" "context"
"io/ioutil" "io/ioutil"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/node/modules"
"github.com/filecoin-project/go-lotus/node/modules/testing"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"gopkg.in/urfave/cli.v2" "gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/node" "github.com/filecoin-project/go-lotus/node"
"github.com/filecoin-project/go-lotus/node/modules"
"github.com/filecoin-project/go-lotus/node/modules/testing"
"github.com/filecoin-project/go-lotus/node/repo" "github.com/filecoin-project/go-lotus/node/repo"
) )
@ -65,7 +64,7 @@ var DaemonCmd = &cli.Command{
} }
var api api.FullNode var api api.FullNode
err = node.New(ctx, stop, err := node.New(ctx,
node.FullAPI(&api), node.FullAPI(&api),
node.Online(), node.Online(),
@ -86,6 +85,6 @@ var DaemonCmd = &cli.Command{
} }
// TODO: properly parse api endpoint (or make it a URL) // TODO: properly parse api endpoint (or make it a URL)
return serveRPC(api, "127.0.0.1:"+cctx.String("api")) return serveRPC(api, stop, "127.0.0.1:"+cctx.String("api"))
}, },
} }

View File

@ -2,7 +2,6 @@ package main
import ( import (
"context" "context"
"log"
"os" "os"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
@ -65,7 +64,7 @@ func main() {
Code: trace.StatusCodeFailedPrecondition, Code: trace.StatusCodeFailedPrecondition,
Message: err.Error(), Message: err.Error(),
}) })
log.Printf("%+v\n", err) log.Warn(err)
} }
return return
} }

View File

@ -1,14 +1,22 @@
package main package main
import ( import (
"context"
"net/http" "net/http"
"os"
"os/signal"
"syscall"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/lib/auth" "github.com/filecoin-project/go-lotus/lib/auth"
"github.com/filecoin-project/go-lotus/lib/jsonrpc" "github.com/filecoin-project/go-lotus/lib/jsonrpc"
"github.com/filecoin-project/go-lotus/node"
logging "github.com/ipfs/go-log"
) )
func serveRPC(a api.FullNode, addr string) error { var log = logging.Logger("main")
func serveRPC(a api.FullNode, stop node.StopFunc, addr string) error {
rpcServer := jsonrpc.NewServer() rpcServer := jsonrpc.NewServer()
rpcServer.Register("Filecoin", api.PermissionedFullAPI(a)) rpcServer.Register("Filecoin", api.PermissionedFullAPI(a))
@ -18,5 +26,20 @@ func serveRPC(a api.FullNode, addr string) error {
} }
http.Handle("/rpc/v0", ah) http.Handle("/rpc/v0", ah)
return http.ListenAndServe(addr, http.DefaultServeMux)
srv := &http.Server{Addr: addr, Handler: http.DefaultServeMux}
sigChan := make(chan os.Signal, 2)
go func() {
<-sigChan
if err := stop(context.TODO()); err != nil {
log.Errorf("graceful shutting down failed: %s", err)
}
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
}()
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
return srv.ListenAndServe()
} }

View File

@ -57,7 +57,7 @@ type clientRequest struct {
// ClientCloser is used to close Client from further use // ClientCloser is used to close Client from further use
type ClientCloser func() type ClientCloser func()
// NewClient creates new josnrpc 2.0 client // NewClient creates new jsonrpc 2.0 client
// //
// handler must be pointer to a struct with function fields // handler must be pointer to a struct with function fields
// Returned value closes the client connection // Returned value closes the client connection

View File

@ -2,24 +2,27 @@ package main
import ( import (
"crypto/rand" "crypto/rand"
"fmt"
"github.com/filecoin-project/go-lotus/lib/jsonrpc" "github.com/filecoin-project/go-lotus/lib/jsonrpc"
"github.com/filecoin-project/go-lotus/node/repo"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"os/exec"
"sync" "sync"
"sync/atomic"
"time"
"github.com/filecoin-project/go-lotus/node/repo"
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
type NodeState int
const (
NodeUnknown = iota
NodeRunning
NodeStopped
)
type api struct { type api struct {
cmds int32 cmds int32
running map[int32]runningNode running map[int32]*runningNode
runningLk sync.Mutex runningLk sync.Mutex
genesis string genesis string
} }
@ -28,79 +31,12 @@ type nodeInfo struct {
Repo string Repo string
ID int32 ID int32
ApiPort int32 ApiPort int32
State NodeState
FullNode string // only for storage nodes FullNode string // only for storage nodes
Storage bool Storage bool
} }
func (api *api) Spawn() (nodeInfo, error) {
dir, err := ioutil.TempDir(os.TempDir(), "lotus-")
if err != nil {
return nodeInfo{}, err
}
genParam := "--genesis=" + api.genesis
id := atomic.AddInt32(&api.cmds, 1)
if id == 1 {
// make genesis
genf, err := ioutil.TempFile(os.TempDir(), "lotus-genesis-")
if err != nil {
return nodeInfo{}, err
}
api.genesis = genf.Name()
genParam = "--lotus-make-random-genesis=" + api.genesis
if err := genf.Close(); err != nil {
return nodeInfo{}, err
}
}
errlogfile, err := os.OpenFile(dir+".err.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
logfile, err := os.OpenFile(dir+".out.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
mux := newWsMux()
cmd := exec.Command("./lotus", "daemon", genParam, "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw)
cmd.Env = []string{"LOTUS_PATH=" + dir}
if err := cmd.Start(); err != nil {
return nodeInfo{}, err
}
info := nodeInfo{
Repo: dir,
ID: id,
ApiPort: 2500 + id,
}
api.runningLk.Lock()
api.running[id] = runningNode{
cmd: cmd,
meta: info,
mux: mux,
stop: func() {
defer close(mux.stop)
defer errlogfile.Close()
defer logfile.Close()
},
}
api.runningLk.Unlock()
time.Sleep(time.Millisecond * 750) // TODO: Something less terrible
return info, nil
}
func (api *api) Nodes() []nodeInfo { func (api *api) Nodes() []nodeInfo {
api.runningLk.Lock() api.runningLk.Lock()
out := make([]nodeInfo, 0, len(api.running)) out := make([]nodeInfo, 0, len(api.running))
@ -135,75 +71,6 @@ func (api *api) TokenFor(id int32) (string, error) {
return string(t), nil return string(t), nil
} }
func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) {
dir, err := ioutil.TempDir(os.TempDir(), "lotus-storage-")
if err != nil {
return nodeInfo{}, err
}
errlogfile, err := os.OpenFile(dir+".err.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
logfile, err := os.OpenFile(dir+".out.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
initArgs := []string{"init"}
if fullNodeRepo == api.running[1].meta.Repo {
initArgs = []string{"init", "--actor=t0101", "--genesis-miner"}
}
id := atomic.AddInt32(&api.cmds, 1)
cmd := exec.Command("./lotus-storage-miner", initArgs...)
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile)
cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo}
if err := cmd.Run(); err != nil {
return nodeInfo{}, err
}
time.Sleep(time.Millisecond * 300)
mux := newWsMux()
cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw)
cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo}
if err := cmd.Start(); err != nil {
return nodeInfo{}, err
}
info := nodeInfo{
Repo: dir,
ID: id,
ApiPort: 2500 + id,
FullNode: fullNodeRepo,
Storage: true,
}
api.runningLk.Lock()
api.running[id] = runningNode{
cmd: cmd,
meta: info,
mux: mux,
stop: func() {
defer close(mux.stop)
defer errlogfile.Close()
defer logfile.Close()
},
}
api.runningLk.Unlock()
time.Sleep(time.Millisecond * 750) // TODO: Something less terrible
return info, nil
}
func (api *api) FullID(id int32) (int32, error) { func (api *api) FullID(id int32) (int32, error) {
api.runningLk.Lock() api.runningLk.Lock()
defer api.runningLk.Unlock() defer api.runningLk.Unlock()
@ -243,6 +110,19 @@ func (api *api) CreateRandomFile(size int64) (string, error) {
return tf.Name(), nil return tf.Name(), nil
} }
func (api *api) Stop(node int32) error {
api.runningLk.Lock()
nd, ok := api.running[node]
api.runningLk.Unlock()
if !ok {
return nil
}
nd.stop()
return nil
}
type client struct { type client struct {
Nodes func() []nodeInfo Nodes func() []nodeInfo
} }

View File

@ -26,8 +26,14 @@ class Address extends React.Component {
componentDidMount() { componentDidMount() {
this.refresh() this.refresh()
if(!this.props.ts) if(!this.props.ts) {
setInterval(this.refresh, 2050) this.updates = setInterval(this.refresh, 2050)
this.props.client.on('close', () => clearInterval(this.updates))
}
}
componentWillUnmount() {
clearInterval(this.updates)
} }
async refresh() { async refresh() {

View File

@ -19,9 +19,11 @@ class FullNode extends React.Component {
this.add1k = this.add1k.bind(this) this.add1k = this.add1k.bind(this)
this.explorer = this.explorer.bind(this) this.explorer = this.explorer.bind(this)
this.client = this.client.bind(this) this.client = this.client.bind(this)
this.stop = this.stop.bind(this)
this.loadInfo() this.loadInfo()
setInterval(this.loadInfo, 2050) let updates = setInterval(this.loadInfo, 2050)
this.props.client.on('close', () => clearInterval(updates))
} }
async loadInfo() { async loadInfo() {
@ -95,6 +97,10 @@ class FullNode extends React.Component {
this.props.mountWindow((onClose) => <Client onClose={onClose} node={this.props.node} client={this.props.client} pondClient={this.props.pondClient} mountWindow={this.props.mountWindow}/>) this.props.mountWindow((onClose) => <Client onClose={onClose} node={this.props.node} client={this.props.client} pondClient={this.props.pondClient} mountWindow={this.props.mountWindow}/>)
} }
async stop() {
await this.props.stop()
}
render() { render() {
let runtime = <div></div> let runtime = <div></div>
@ -165,7 +171,8 @@ class FullNode extends React.Component {
<Cristal <Cristal
title={"Node " + this.props.node.ID} title={"Node " + this.props.node.ID}
initialPosition={{x: this.props.node.ID*30, y: this.props.node.ID * 30}} initialPosition={{x: this.props.node.ID*30, y: this.props.node.ID * 30}}
initialSize={{width: 690, height: 300}} > initialSize={{width: 690, height: 300}}
onClose={this.stop} >
<div className="CristalScroll"> <div className="CristalScroll">
<div className="FullNode"> <div className="FullNode">
{runtime} {runtime}

View File

@ -9,6 +9,8 @@ import pushMessage from "./chain/send";
import Logs from "./Logs"; import Logs from "./Logs";
import StorageNodeInit from "./StorageNodeInit"; import StorageNodeInit from "./StorageNodeInit";
const [NodeUnknown, NodeRunning, NodeStopped] = [0, 1, 2]
class NodeList extends React.Component { class NodeList extends React.Component {
constructor(props) { constructor(props) {
super(props) super(props)
@ -53,6 +55,7 @@ class NodeList extends React.Component {
give1k={this.transfer1kFrom1} give1k={this.transfer1kFrom1}
mountWindow={this.props.mountWindow} mountWindow={this.props.mountWindow}
spawnStorageNode={this.spawnStorageNode} spawnStorageNode={this.spawnStorageNode}
stop={this.stopNode(node.ID, onClose)}
/>) />)
} else { } else {
const fullId = await this.props.client.call('Pond.FullID', [node.ID]) const fullId = await this.props.client.call('Pond.FullID', [node.ID])
@ -62,6 +65,7 @@ class NodeList extends React.Component {
pondClient={this.props.client} pondClient={this.props.client}
fullConn={this.state.nodes[fullId].conn} fullConn={this.state.nodes[fullId].conn}
mountWindow={this.props.mountWindow} mountWindow={this.props.mountWindow}
stop={this.stopNode(node.ID, onClose)}
/>) />)
} }
}) })
@ -72,7 +76,7 @@ class NodeList extends React.Component {
const nodes = nds.reduce((o, i) => {o[i.ID] = i; return o}, {}) const nodes = nds.reduce((o, i) => {o[i.ID] = i; return o}, {})
console.log('nds', nodes) console.log('nds', nodes)
Object.keys(nodes).map(n => nodes[n]).forEach(n => this.mountNode(n)) Object.keys(nodes).map(n => nodes[n]).filter(n => n.State === NodeRunning).forEach(n => this.mountNode(n))
this.setState({existingLoaded: true, nodes: nodes}) this.setState({existingLoaded: true, nodes: nodes})
} }
@ -114,6 +118,23 @@ class NodeList extends React.Component {
//this.setState(state => ({nodes: {...state.nodes, [node.ID]: node}})) //this.setState(state => ({nodes: {...state.nodes, [node.ID]: node}}))
} }
stopNode = (id, closeWindow) => async () => {
this.state.nodes[id].conn.close()
await this.props.client.call('Pond.Stop', [id])
closeWindow()
this.setState(prev => ({
nodes: {
...prev.nodes,
[id]: {...(prev.nodes[id]), State: NodeStopped, conn: undefined}
}
}))
}
startNode = (id) => async () => {
let node = await this.props.client.call('Pond.RestartNode', [Number(id)])
await this.mountNode(node)
}
connMgr() { connMgr() {
this.setState({showConnMgr: true}) this.setState({showConnMgr: true})
} }
@ -155,6 +176,9 @@ class NodeList extends React.Component {
info = <span>{nd.peerid}</span> info = <span>{nd.peerid}</span>
logs = <a href='#' onClick={() => this.props.mountWindow(cl => <Logs node={nd.ID} onClose={cl}/>)}>[logs]</a> logs = <a href='#' onClick={() => this.props.mountWindow(cl => <Logs node={nd.ID} onClose={cl}/>)}>[logs]</a>
} }
if (nd.State === NodeStopped) {
info = <span>[stopped] <a href="#" onClick={this.startNode(n)}>[START]</a></span>
}
return <div key={n}> return <div key={n}>
{n} {type} {logs} {info} {n} {type} {logs} {info}

View File

@ -29,6 +29,7 @@ class StorageNode extends React.Component {
this.loadInfo = this.loadInfo.bind(this) this.loadInfo = this.loadInfo.bind(this)
this.sealGarbage = this.sealGarbage.bind(this) this.sealGarbage = this.sealGarbage.bind(this)
this.stop = this.stop.bind(this)
this.connect() this.connect()
} }
@ -59,7 +60,8 @@ class StorageNode extends React.Component {
// this.props.onConnect(client, id) // TODO: dedupe connecting part // this.props.onConnect(client, id) // TODO: dedupe connecting part
this.loadInfo() this.loadInfo()
setInterval(this.loadInfo, 1050) let updates = setInterval(this.loadInfo, 1050)
client.on('close', () => clearInterval(updates))
}) })
console.log(token) // todo: use console.log(token) // todo: use
@ -89,6 +91,10 @@ class StorageNode extends React.Component {
await this.state.client.call("Filecoin.StoreGarbageData", []) await this.state.client.call("Filecoin.StoreGarbageData", [])
} }
async stop() {
await this.props.stop()
}
render() { render() {
let runtime = <div></div> let runtime = <div></div>
if (this.state.actor) { if (this.state.actor) {
@ -117,7 +123,8 @@ class StorageNode extends React.Component {
return <Cristal return <Cristal
title={"Storage Miner Node " + this.props.node.ID} title={"Storage Miner Node " + this.props.node.ID}
initialPosition={{x: this.props.node.ID*30, y: this.props.node.ID * 30}}> initialPosition={{x: this.props.node.ID*30, y: this.props.node.ID * 30}}
onClose={this.stop} >
<div className="CristalScroll"> <div className="CristalScroll">
<div className="StorageNode"> <div className="StorageNode">
{runtime} {runtime}

View File

@ -130,7 +130,7 @@ var runCmd = &cli.Command{
Usage: "run lotuspond daemon", Usage: "run lotuspond daemon",
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
rpcServer := jsonrpc.NewServer() rpcServer := jsonrpc.NewServer()
a := &api{running: map[int32]runningNode{}} a := &api{running: map[int32]*runningNode{}}
rpcServer.Register("Pond", a) rpcServer.Register("Pond", a)
http.Handle("/", http.FileServer(http.Dir("lotuspond/front/build"))) http.Handle("/", http.FileServer(http.Dir("lotuspond/front/build")))

218
lotuspond/spawn.go Normal file
View File

@ -0,0 +1,218 @@
package main
import (
"fmt"
"golang.org/x/xerrors"
"io"
"io/ioutil"
"os"
"os/exec"
"sync/atomic"
"time"
)
func (api *api) Spawn() (nodeInfo, error) {
dir, err := ioutil.TempDir(os.TempDir(), "lotus-")
if err != nil {
return nodeInfo{}, err
}
genParam := "--genesis=" + api.genesis
id := atomic.AddInt32(&api.cmds, 1)
if id == 1 {
// make genesis
genf, err := ioutil.TempFile(os.TempDir(), "lotus-genesis-")
if err != nil {
return nodeInfo{}, err
}
api.genesis = genf.Name()
genParam = "--lotus-make-random-genesis=" + api.genesis
if err := genf.Close(); err != nil {
return nodeInfo{}, err
}
}
errlogfile, err := os.OpenFile(dir+".err.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
logfile, err := os.OpenFile(dir+".out.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
mux := newWsMux()
cmd := exec.Command("./lotus", "daemon", genParam, "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw)
cmd.Env = []string{"LOTUS_PATH=" + dir}
if err := cmd.Start(); err != nil {
return nodeInfo{}, err
}
info := nodeInfo{
Repo: dir,
ID: id,
ApiPort: 2500 + id,
State: NodeRunning,
}
api.runningLk.Lock()
api.running[id] = &runningNode{
cmd: cmd,
meta: info,
mux: mux,
stop: func() {
cmd.Process.Signal(os.Interrupt)
cmd.Process.Wait()
api.runningLk.Lock()
api.running[id].meta.State = NodeStopped
api.runningLk.Unlock()
//logfile.Close()
//errlogfile.Close()
//close(mux.stop)
},
}
api.runningLk.Unlock()
time.Sleep(time.Millisecond * 750) // TODO: Something less terrible
return info, nil
}
func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) {
dir, err := ioutil.TempDir(os.TempDir(), "lotus-storage-")
if err != nil {
return nodeInfo{}, err
}
errlogfile, err := os.OpenFile(dir+".err.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
logfile, err := os.OpenFile(dir+".out.log", os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nodeInfo{}, err
}
initArgs := []string{"init"}
if fullNodeRepo == api.running[1].meta.Repo {
initArgs = []string{"init", "--actor=t0101", "--genesis-miner"}
}
id := atomic.AddInt32(&api.cmds, 1)
cmd := exec.Command("./lotus-storage-miner", initArgs...)
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile)
cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo}
if err := cmd.Run(); err != nil {
return nodeInfo{}, err
}
time.Sleep(time.Millisecond * 300)
mux := newWsMux()
cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw)
cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo}
if err := cmd.Start(); err != nil {
return nodeInfo{}, err
}
info := nodeInfo{
Repo: dir,
ID: id,
ApiPort: 2500 + id,
State: NodeRunning,
FullNode: fullNodeRepo,
Storage: true,
}
api.runningLk.Lock()
api.running[id] = &runningNode{
cmd: cmd,
meta: info,
mux: mux,
stop: func() {
cmd.Process.Signal(os.Interrupt)
cmd.Process.Wait()
api.runningLk.Lock()
api.running[id].meta.State = NodeStopped
api.runningLk.Unlock()
//logfile.Close()
//errlogfile.Close()
//close(mux.stop)
},
}
api.runningLk.Unlock()
time.Sleep(time.Millisecond * 750) // TODO: Something less terrible
return info, nil
}
func (api *api) RestartNode(id int32) (nodeInfo, error) {
api.runningLk.Lock()
defer api.runningLk.Unlock()
nd, ok := api.running[id]
if !ok {
return nodeInfo{}, xerrors.New("node not found")
}
if nd.meta.State != NodeStopped {
return nodeInfo{}, xerrors.New("node not stopped")
}
var cmd *exec.Cmd
if nd.meta.Storage {
cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id))
} else {
cmd = exec.Command("./lotus", "daemon", "--api", fmt.Sprintf("%d", 2500+id))
}
cmd.Stderr = nd.cmd.Stderr // recycle old vars
cmd.Stdout = nd.cmd.Stdout
cmd.Env = nd.cmd.Env
if err := cmd.Start(); err != nil {
return nodeInfo{}, err
}
nd.cmd = cmd
nd.stop = func() {
cmd.Process.Signal(os.Interrupt)
cmd.Process.Wait()
api.runningLk.Lock()
api.running[id].meta.State = NodeStopped
api.runningLk.Unlock()
//logfile.Close()
//errlogfile.Close()
//close(mux.stop)
}
nd.meta.State = NodeRunning
time.Sleep(time.Millisecond * 750) // TODO: Something less terrible
return nd.meta, nil
}

View File

@ -337,8 +337,10 @@ func FullAPI(out *api.FullNode) Option {
} }
} }
type StopFunc func(context.Context) error
// New builds and starts new Filecoin node // New builds and starts new Filecoin node
func New(ctx context.Context, opts ...Option) error { func New(ctx context.Context, opts ...Option) (StopFunc, error) {
settings := Settings{ settings := Settings{
modules: map[interface{}]fx.Option{}, modules: map[interface{}]fx.Option{},
invokes: make([]fx.Option, _nInvokes), invokes: make([]fx.Option, _nInvokes),
@ -346,7 +348,7 @@ func New(ctx context.Context, opts ...Option) error {
// apply module options in the right order // apply module options in the right order
if err := Options(Options(defaults()...), Options(opts...))(&settings); err != nil { if err := Options(Options(defaults()...), Options(opts...))(&settings); err != nil {
return err return nil, err
} }
// gather constructors for fx.Options // gather constructors for fx.Options
@ -374,10 +376,10 @@ func New(ctx context.Context, opts ...Option) error {
// correctly // correctly
if err := app.Start(ctx); err != nil { if err := app.Start(ctx); err != nil {
// comment fx.NopLogger few lines above for easier debugging // comment fx.NopLogger few lines above for easier debugging
return err return nil, err
} }
return nil return app.Stop, nil
} }
// In-memory / testing // In-memory / testing

View File

@ -147,9 +147,13 @@ func RegisterMiner(lc fx.Lifecycle, ds dtypes.MetadataDS, api api.FullNode) erro
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error { OnStart: func(ctx context.Context) error {
log.Infof("registering miner '%s' with full node", minerAddr) log.Infof("Registering miner '%s' with full node", minerAddr)
return api.MinerRegister(ctx, minerAddr) return api.MinerRegister(ctx, minerAddr)
}, },
OnStop: func(ctx context.Context) error {
log.Infof("Unregistering miner '%s' from full node", minerAddr)
return api.MinerUnregister(ctx, minerAddr)
},
}) })
return nil return nil
} }

View File

@ -36,7 +36,8 @@ func builder(t *testing.T, n int) []api.FullNode {
} }
var err error var err error
err = node.New(ctx, // TODO: Don't ignore stop
_, err = node.New(ctx,
node.FullAPI(&out[i]), node.FullAPI(&out[i]),
node.Online(), node.Online(),
node.Repo(repo.NewMemory(nil)), node.Repo(repo.NewMemory(nil)),

View File

@ -181,6 +181,11 @@ func (fsr *fsLockedRepo) Close() error {
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
return xerrors.Errorf("could not remove API file: %w", err) return xerrors.Errorf("could not remove API file: %w", err)
} }
if fsr.ds != nil {
if err := fsr.ds.Close(); err != nil {
return xerrors.Errorf("could not close datastore: %w", err)
}
}
err = fsr.closer.Close() err = fsr.closer.Close()
fsr.closer = nil fsr.closer = nil