From 76177907f2d593a596588c9edb99f934480b0b53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 17 Sep 2019 14:03:28 +0200 Subject: [PATCH 1/8] pond: Allow to stop full nodes --- lotuspond/api.go | 58 ++++++++++++++++++++++++++++----- lotuspond/front/src/Address.js | 6 ++-- lotuspond/front/src/FullNode.js | 11 +++++-- lotuspond/front/src/NodeList.js | 24 +++++++++++++- lotuspond/main.go | 2 +- 5 files changed, 86 insertions(+), 15 deletions(-) diff --git a/lotuspond/api.go b/lotuspond/api.go index 3e3b75c4f..1dc8aeab3 100644 --- a/lotuspond/api.go +++ b/lotuspond/api.go @@ -17,9 +17,17 @@ import ( "golang.org/x/xerrors" ) +type NodeState int + +const ( + NodeUnknown = iota + NodeRunning + NodeStopped +) + type api struct { cmds int32 - running map[int32]runningNode + running map[int32]*runningNode runningLk sync.Mutex genesis string } @@ -28,6 +36,7 @@ type nodeInfo struct { Repo string ID int32 ApiPort int32 + State NodeState FullNode string // only for storage nodes Storage bool @@ -80,18 +89,27 @@ func (api *api) Spawn() (nodeInfo, error) { Repo: dir, ID: id, ApiPort: 2500 + id, + State: NodeRunning, } api.runningLk.Lock() - api.running[id] = runningNode{ + api.running[id] = &runningNode{ cmd: cmd, meta: info, mux: mux, stop: func() { - defer close(mux.stop) - defer errlogfile.Close() - defer logfile.Close() + cmd.Process.Signal(os.Interrupt) + cmd.Process.Wait() + + api.runningLk.Lock() + api.running[id].meta.State = NodeStopped + api.runningLk.Unlock() + + logfile.Close() + errlogfile.Close() + + close(mux.stop) }, } api.runningLk.Unlock() @@ -180,21 +198,30 @@ func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) { Repo: dir, ID: id, ApiPort: 2500 + id, + State: NodeRunning, FullNode: fullNodeRepo, Storage: true, } api.runningLk.Lock() - api.running[id] = runningNode{ + api.running[id] = &runningNode{ cmd: cmd, meta: info, mux: mux, stop: func() { - defer close(mux.stop) - defer errlogfile.Close() - defer logfile.Close() + cmd.Process.Signal(os.Interrupt) + cmd.Process.Wait() + + api.runningLk.Lock() + api.running[id].meta.State = NodeStopped + api.runningLk.Unlock() + + logfile.Close() + errlogfile.Close() + + close(mux.stop) }, } api.runningLk.Unlock() @@ -243,6 +270,19 @@ func (api *api) CreateRandomFile(size int64) (string, error) { return tf.Name(), nil } +func (api *api) Stop(node int32) error { + api.runningLk.Lock() + nd, ok := api.running[node] + api.runningLk.Unlock() + + if !ok { + return nil + } + + nd.stop() + return nil +} + type client struct { Nodes func() []nodeInfo } diff --git a/lotuspond/front/src/Address.js b/lotuspond/front/src/Address.js index d92cf35d2..f3c7df73d 100644 --- a/lotuspond/front/src/Address.js +++ b/lotuspond/front/src/Address.js @@ -26,8 +26,10 @@ class Address extends React.Component { componentDidMount() { this.refresh() - if(!this.props.ts) - setInterval(this.refresh, 2050) + if(!this.props.ts) { + let updates = setInterval(this.refresh, 2050) + this.props.client.on('close', () => clearInterval(updates)) + } } async refresh() { diff --git a/lotuspond/front/src/FullNode.js b/lotuspond/front/src/FullNode.js index 312e917db..7889e91c3 100644 --- a/lotuspond/front/src/FullNode.js +++ b/lotuspond/front/src/FullNode.js @@ -19,9 +19,11 @@ class FullNode extends React.Component { this.add1k = this.add1k.bind(this) this.explorer = this.explorer.bind(this) this.client = this.client.bind(this) + this.stop = this.stop.bind(this) this.loadInfo() - setInterval(this.loadInfo, 2050) + let updates = setInterval(this.loadInfo, 2050) + this.props.client.on('close', () => clearInterval(updates)) } async loadInfo() { @@ -95,6 +97,10 @@ class FullNode extends React.Component { this.props.mountWindow((onClose) => ) } + async stop() { + await this.props.stop() + } + render() { let runtime =
@@ -165,7 +171,8 @@ class FullNode extends React.Component { + initialSize={{width: 690, height: 300}} + onClose={this.stop} >
{runtime} diff --git a/lotuspond/front/src/NodeList.js b/lotuspond/front/src/NodeList.js index 7399530aa..367fe1680 100644 --- a/lotuspond/front/src/NodeList.js +++ b/lotuspond/front/src/NodeList.js @@ -9,6 +9,8 @@ import pushMessage from "./chain/send"; import Logs from "./Logs"; import StorageNodeInit from "./StorageNodeInit"; +const [NodeUnknown, NodeRunning, NodeStopped] = [0, 1, 2] + class NodeList extends React.Component { constructor(props) { super(props) @@ -53,6 +55,7 @@ class NodeList extends React.Component { give1k={this.transfer1kFrom1} mountWindow={this.props.mountWindow} spawnStorageNode={this.spawnStorageNode} + stop={this.stopNode(node.ID, onClose)} />) } else { const fullId = await this.props.client.call('Pond.FullID', [node.ID]) @@ -72,7 +75,7 @@ class NodeList extends React.Component { const nodes = nds.reduce((o, i) => {o[i.ID] = i; return o}, {}) console.log('nds', nodes) - Object.keys(nodes).map(n => nodes[n]).forEach(n => this.mountNode(n)) + Object.keys(nodes).map(n => nodes[n]).filter(n => n.State === NodeRunning).forEach(n => this.mountNode(n)) this.setState({existingLoaded: true, nodes: nodes}) } @@ -114,6 +117,22 @@ class NodeList extends React.Component { //this.setState(state => ({nodes: {...state.nodes, [node.ID]: node}})) } + stopNode = (id, closeWindow) => async () => { + this.state.nodes[id].conn.close() + await this.props.client.call('Pond.Stop', [id]) + closeWindow() + this.setState(prev => ({ + nodes: { + ...prev.nodes, + [id]: {...(prev.nodes[id]), State: NodeStopped, conn: undefined} + } + })) + } + + startNode = (id) => async () => { + + } + connMgr() { this.setState({showConnMgr: true}) } @@ -155,6 +174,9 @@ class NodeList extends React.Component { info = {nd.peerid} logs = this.props.mountWindow(cl => )}>[logs] } + if (nd.State === NodeStopped) { + info = [stopped] [START] + } return
{n} {type} {logs} {info} diff --git a/lotuspond/main.go b/lotuspond/main.go index 2bd1ea3dd..8a114939e 100644 --- a/lotuspond/main.go +++ b/lotuspond/main.go @@ -130,7 +130,7 @@ var runCmd = &cli.Command{ Usage: "run lotuspond daemon", Action: func(cctx *cli.Context) error { rpcServer := jsonrpc.NewServer() - a := &api{running: map[int32]runningNode{}} + a := &api{running: map[int32]*runningNode{}} rpcServer.Register("Pond", a) http.Handle("/", http.FileServer(http.Dir("lotuspond/front/build"))) From 9353cdb537ca1dcef60422a8a5414565cede7f69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 17 Sep 2019 14:36:17 +0200 Subject: [PATCH 2/8] pond: Restarting full nodes --- lotuspond/api.go | 162 +----------------------- lotuspond/front/src/NodeList.js | 3 +- lotuspond/spawn.go | 218 ++++++++++++++++++++++++++++++++ 3 files changed, 221 insertions(+), 162 deletions(-) create mode 100644 lotuspond/spawn.go diff --git a/lotuspond/api.go b/lotuspond/api.go index 1dc8aeab3..c3bee1ea8 100644 --- a/lotuspond/api.go +++ b/lotuspond/api.go @@ -2,17 +2,12 @@ package main import ( "crypto/rand" - "fmt" "github.com/filecoin-project/go-lotus/lib/jsonrpc" + "github.com/filecoin-project/go-lotus/node/repo" "io" "io/ioutil" "os" - "os/exec" "sync" - "sync/atomic" - "time" - - "github.com/filecoin-project/go-lotus/node/repo" "golang.org/x/xerrors" ) @@ -42,83 +37,6 @@ type nodeInfo struct { Storage bool } -func (api *api) Spawn() (nodeInfo, error) { - dir, err := ioutil.TempDir(os.TempDir(), "lotus-") - if err != nil { - return nodeInfo{}, err - } - - genParam := "--genesis=" + api.genesis - id := atomic.AddInt32(&api.cmds, 1) - if id == 1 { - // make genesis - genf, err := ioutil.TempFile(os.TempDir(), "lotus-genesis-") - if err != nil { - return nodeInfo{}, err - } - - api.genesis = genf.Name() - genParam = "--lotus-make-random-genesis=" + api.genesis - - if err := genf.Close(); err != nil { - return nodeInfo{}, err - } - - } - - errlogfile, err := os.OpenFile(dir+".err.log", os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return nodeInfo{}, err - } - logfile, err := os.OpenFile(dir+".out.log", os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return nodeInfo{}, err - } - - mux := newWsMux() - - cmd := exec.Command("./lotus", "daemon", genParam, "--api", fmt.Sprintf("%d", 2500+id)) - cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw) - cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw) - cmd.Env = []string{"LOTUS_PATH=" + dir} - if err := cmd.Start(); err != nil { - return nodeInfo{}, err - } - - info := nodeInfo{ - Repo: dir, - ID: id, - ApiPort: 2500 + id, - State: NodeRunning, - } - - api.runningLk.Lock() - api.running[id] = &runningNode{ - cmd: cmd, - meta: info, - - mux: mux, - stop: func() { - cmd.Process.Signal(os.Interrupt) - cmd.Process.Wait() - - api.runningLk.Lock() - api.running[id].meta.State = NodeStopped - api.runningLk.Unlock() - - logfile.Close() - errlogfile.Close() - - close(mux.stop) - }, - } - api.runningLk.Unlock() - - time.Sleep(time.Millisecond * 750) // TODO: Something less terrible - - return info, nil -} - func (api *api) Nodes() []nodeInfo { api.runningLk.Lock() out := make([]nodeInfo, 0, len(api.running)) @@ -153,84 +71,6 @@ func (api *api) TokenFor(id int32) (string, error) { return string(t), nil } -func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) { - dir, err := ioutil.TempDir(os.TempDir(), "lotus-storage-") - if err != nil { - return nodeInfo{}, err - } - - errlogfile, err := os.OpenFile(dir+".err.log", os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return nodeInfo{}, err - } - logfile, err := os.OpenFile(dir+".out.log", os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return nodeInfo{}, err - } - - initArgs := []string{"init"} - if fullNodeRepo == api.running[1].meta.Repo { - initArgs = []string{"init", "--actor=t0101", "--genesis-miner"} - } - - id := atomic.AddInt32(&api.cmds, 1) - cmd := exec.Command("./lotus-storage-miner", initArgs...) - cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile) - cmd.Stdout = io.MultiWriter(os.Stdout, logfile) - cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo} - if err := cmd.Run(); err != nil { - return nodeInfo{}, err - } - - time.Sleep(time.Millisecond * 300) - - mux := newWsMux() - - cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id)) - cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw) - cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw) - cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo} - if err := cmd.Start(); err != nil { - return nodeInfo{}, err - } - - info := nodeInfo{ - Repo: dir, - ID: id, - ApiPort: 2500 + id, - State: NodeRunning, - - FullNode: fullNodeRepo, - Storage: true, - } - - api.runningLk.Lock() - api.running[id] = &runningNode{ - cmd: cmd, - meta: info, - - mux: mux, - stop: func() { - cmd.Process.Signal(os.Interrupt) - cmd.Process.Wait() - - api.runningLk.Lock() - api.running[id].meta.State = NodeStopped - api.runningLk.Unlock() - - logfile.Close() - errlogfile.Close() - - close(mux.stop) - }, - } - api.runningLk.Unlock() - - time.Sleep(time.Millisecond * 750) // TODO: Something less terrible - - return info, nil -} - func (api *api) FullID(id int32) (int32, error) { api.runningLk.Lock() defer api.runningLk.Unlock() diff --git a/lotuspond/front/src/NodeList.js b/lotuspond/front/src/NodeList.js index 367fe1680..5db07bfa5 100644 --- a/lotuspond/front/src/NodeList.js +++ b/lotuspond/front/src/NodeList.js @@ -130,7 +130,8 @@ class NodeList extends React.Component { } startNode = (id) => async () => { - + let node = await this.props.client.call('Pond.RestartNode', [Number(id)]) + await this.mountNode(node) } connMgr() { diff --git a/lotuspond/spawn.go b/lotuspond/spawn.go new file mode 100644 index 000000000..2676ae119 --- /dev/null +++ b/lotuspond/spawn.go @@ -0,0 +1,218 @@ +package main + +import ( + "fmt" + "golang.org/x/xerrors" + "io" + "io/ioutil" + "os" + "os/exec" + "sync/atomic" + "time" +) + +func (api *api) Spawn() (nodeInfo, error) { + dir, err := ioutil.TempDir(os.TempDir(), "lotus-") + if err != nil { + return nodeInfo{}, err + } + + genParam := "--genesis=" + api.genesis + id := atomic.AddInt32(&api.cmds, 1) + if id == 1 { + // make genesis + genf, err := ioutil.TempFile(os.TempDir(), "lotus-genesis-") + if err != nil { + return nodeInfo{}, err + } + + api.genesis = genf.Name() + genParam = "--lotus-make-random-genesis=" + api.genesis + + if err := genf.Close(); err != nil { + return nodeInfo{}, err + } + + } + + errlogfile, err := os.OpenFile(dir+".err.log", os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nodeInfo{}, err + } + logfile, err := os.OpenFile(dir+".out.log", os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nodeInfo{}, err + } + + mux := newWsMux() + + cmd := exec.Command("./lotus", "daemon", genParam, "--api", fmt.Sprintf("%d", 2500+id)) + cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw) + cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw) + cmd.Env = []string{"LOTUS_PATH=" + dir} + if err := cmd.Start(); err != nil { + return nodeInfo{}, err + } + + info := nodeInfo{ + Repo: dir, + ID: id, + ApiPort: 2500 + id, + State: NodeRunning, + } + + api.runningLk.Lock() + api.running[id] = &runningNode{ + cmd: cmd, + meta: info, + + mux: mux, + stop: func() { + cmd.Process.Signal(os.Interrupt) + cmd.Process.Wait() + + api.runningLk.Lock() + api.running[id].meta.State = NodeStopped + api.runningLk.Unlock() + + //logfile.Close() + //errlogfile.Close() + + //close(mux.stop) + }, + } + api.runningLk.Unlock() + + time.Sleep(time.Millisecond * 750) // TODO: Something less terrible + + return info, nil +} + +func (api *api) SpawnStorage(fullNodeRepo string) (nodeInfo, error) { + dir, err := ioutil.TempDir(os.TempDir(), "lotus-storage-") + if err != nil { + return nodeInfo{}, err + } + + errlogfile, err := os.OpenFile(dir+".err.log", os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nodeInfo{}, err + } + logfile, err := os.OpenFile(dir+".out.log", os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return nodeInfo{}, err + } + + initArgs := []string{"init"} + if fullNodeRepo == api.running[1].meta.Repo { + initArgs = []string{"init", "--actor=t0101", "--genesis-miner"} + } + + id := atomic.AddInt32(&api.cmds, 1) + cmd := exec.Command("./lotus-storage-miner", initArgs...) + cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile) + cmd.Stdout = io.MultiWriter(os.Stdout, logfile) + cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo} + if err := cmd.Run(); err != nil { + return nodeInfo{}, err + } + + time.Sleep(time.Millisecond * 300) + + mux := newWsMux() + + cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id)) + cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw) + cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw) + cmd.Env = []string{"LOTUS_STORAGE_PATH=" + dir, "LOTUS_PATH=" + fullNodeRepo} + if err := cmd.Start(); err != nil { + return nodeInfo{}, err + } + + info := nodeInfo{ + Repo: dir, + ID: id, + ApiPort: 2500 + id, + State: NodeRunning, + + FullNode: fullNodeRepo, + Storage: true, + } + + api.runningLk.Lock() + api.running[id] = &runningNode{ + cmd: cmd, + meta: info, + + mux: mux, + stop: func() { + cmd.Process.Signal(os.Interrupt) + cmd.Process.Wait() + + api.runningLk.Lock() + api.running[id].meta.State = NodeStopped + api.runningLk.Unlock() + + //logfile.Close() + //errlogfile.Close() + + //close(mux.stop) + }, + } + api.runningLk.Unlock() + + time.Sleep(time.Millisecond * 750) // TODO: Something less terrible + + return info, nil +} + +func (api *api) RestartNode(id int32) (nodeInfo, error) { + api.runningLk.Lock() + defer api.runningLk.Unlock() + nd, ok := api.running[id] + + if !ok { + return nodeInfo{}, xerrors.New("node not found") + } + + if nd.meta.State != NodeStopped { + return nodeInfo{}, xerrors.New("node not stopped") + } + + var cmd *exec.Cmd + if nd.meta.Storage { + cmd = exec.Command("./lotus-storage-miner", "run", "--api", fmt.Sprintf("%d", 2500+id)) + } else { + cmd = exec.Command("./lotus", "daemon", "--api", fmt.Sprintf("%d", 2500+id)) + } + + cmd.Stderr = nd.cmd.Stderr // recycle old vars + cmd.Stdout = nd.cmd.Stdout + cmd.Env = nd.cmd.Env + + if err := cmd.Start(); err != nil { + return nodeInfo{}, err + } + + nd.cmd = cmd + + nd.stop = func() { + cmd.Process.Signal(os.Interrupt) + cmd.Process.Wait() + + api.runningLk.Lock() + api.running[id].meta.State = NodeStopped + api.runningLk.Unlock() + + //logfile.Close() + //errlogfile.Close() + + //close(mux.stop) + } + + nd.meta.State = NodeRunning + + time.Sleep(time.Millisecond * 750) // TODO: Something less terrible + + return nd.meta, nil +} From 215f95aa5b2417aeb0961a6bb7f0ce09b673c9d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 17 Sep 2019 14:53:43 +0200 Subject: [PATCH 3/8] pond: Stopping storage nodes --- lotuspond/front/src/Address.js | 8 ++++++-- lotuspond/front/src/NodeList.js | 1 + lotuspond/front/src/StorageNode.js | 11 +++++++++-- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/lotuspond/front/src/Address.js b/lotuspond/front/src/Address.js index f3c7df73d..3b98b0f6f 100644 --- a/lotuspond/front/src/Address.js +++ b/lotuspond/front/src/Address.js @@ -27,11 +27,15 @@ class Address extends React.Component { componentDidMount() { this.refresh() if(!this.props.ts) { - let updates = setInterval(this.refresh, 2050) - this.props.client.on('close', () => clearInterval(updates)) + this.updates = setInterval(this.refresh, 2050) + this.props.client.on('close', () => clearInterval(this.updates)) } } + componentWillUnmount() { + clearInterval(this.updates) + } + async refresh() { let balance = 0 let actor = {} diff --git a/lotuspond/front/src/NodeList.js b/lotuspond/front/src/NodeList.js index 5db07bfa5..5da9f05dc 100644 --- a/lotuspond/front/src/NodeList.js +++ b/lotuspond/front/src/NodeList.js @@ -65,6 +65,7 @@ class NodeList extends React.Component { pondClient={this.props.client} fullConn={this.state.nodes[fullId].conn} mountWindow={this.props.mountWindow} + stop={this.stopNode(node.ID, onClose)} />) } }) diff --git a/lotuspond/front/src/StorageNode.js b/lotuspond/front/src/StorageNode.js index e2f2f86de..65d8388f0 100644 --- a/lotuspond/front/src/StorageNode.js +++ b/lotuspond/front/src/StorageNode.js @@ -29,6 +29,7 @@ class StorageNode extends React.Component { this.loadInfo = this.loadInfo.bind(this) this.sealGarbage = this.sealGarbage.bind(this) + this.stop = this.stop.bind(this) this.connect() } @@ -59,7 +60,8 @@ class StorageNode extends React.Component { // this.props.onConnect(client, id) // TODO: dedupe connecting part this.loadInfo() - setInterval(this.loadInfo, 1050) + let updates = setInterval(this.loadInfo, 1050) + client.on('close', () => clearInterval(updates)) }) console.log(token) // todo: use @@ -89,6 +91,10 @@ class StorageNode extends React.Component { await this.state.client.call("Filecoin.StoreGarbageData", []) } + async stop() { + await this.props.stop() + } + render() { let runtime =
if (this.state.actor) { @@ -117,7 +123,8 @@ class StorageNode extends React.Component { return + initialPosition={{x: this.props.node.ID*30, y: this.props.node.ID * 30}} + onClose={this.stop} >
{runtime} From 83f1a336a67edef921512a1c1fd6befd3dd4bb84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 17 Sep 2019 16:23:08 +0200 Subject: [PATCH 4/8] node: Basic graceful shutdown --- chain/deals/handler.go | 2 +- chain/sub/incoming.go | 12 +++++++++--- cmd/lotus-storage-miner/main.go | 2 +- cmd/lotus-storage-miner/run.go | 27 ++++++++++++++++++++++++--- cmd/lotus/daemon.go | 4 ++-- cmd/lotus/main.go | 3 +-- cmd/lotus/rpc.go | 27 +++++++++++++++++++++++++-- lib/valctx/context.go | 28 ++++++++++++++++++++++++++++ node/builder.go | 10 ++++++---- node/modules/storageminer.go | 6 +++++- 10 files changed, 102 insertions(+), 19 deletions(-) create mode 100644 lib/valctx/context.go diff --git a/chain/deals/handler.go b/chain/deals/handler.go index 98f789294..112f827f5 100644 --- a/chain/deals/handler.go +++ b/chain/deals/handler.go @@ -124,7 +124,7 @@ func (h *Handler) Run(ctx context.Context) { // TODO: restore state go func() { - defer log.Error("quitting deal handler loop") + defer log.Warn("quitting deal handler loop") defer close(h.stopped) for { diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 8ff51804c..d41eac887 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -2,8 +2,6 @@ package sub import ( "context" - "fmt" - logging "github.com/ipfs/go-log" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -17,6 +15,10 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha for { msg, err := bsub.Next(ctx) if err != nil { + if ctx.Err() != nil { + log.Warn("quitting HandleIncomingBlocks loop") + return + } log.Error("error from block subscription: ", err) continue } @@ -55,7 +57,11 @@ func HandleIncomingMessages(ctx context.Context, mpool *chain.MessagePool, msub for { msg, err := msub.Next(ctx) if err != nil { - fmt.Println("error from message subscription: ", err) + log.Warn("error from message subscription: ", err) + if ctx.Err() != nil { + log.Warn("quitting HandleIncomingMessages loop") + return + } continue } diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go index d91745783..131a4b42d 100644 --- a/cmd/lotus-storage-miner/main.go +++ b/cmd/lotus-storage-miner/main.go @@ -67,7 +67,7 @@ func main() { } if err := app.Run(os.Args); err != nil { - log.Error(err) + log.Warn(err) return } } diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 8696c7721..63cd3a53f 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -1,8 +1,12 @@ package main import ( + "context" + "github.com/filecoin-project/go-lotus/lib/valctx" "net/http" "os" + "os/signal" + "syscall" "github.com/multiformats/go-multiaddr" "golang.org/x/xerrors" @@ -32,7 +36,7 @@ var runCmd = &cli.Command{ if err != nil { return err } - ctx := lcli.ReqContext(cctx) + ctx := &valctx.Context{Parent: lcli.ReqContext(cctx)} go func() { // a hack for now to handle sigint @@ -61,7 +65,7 @@ var runCmd = &cli.Command{ } var minerapi api.StorageMiner - err = node.New(ctx, + stop, err := node.New(ctx, node.StorageMiner(&minerapi), node.Online(), node.Repo(r), @@ -101,6 +105,23 @@ var runCmd = &cli.Command{ } http.Handle("/rpc/v0", ah) - return http.ListenAndServe("127.0.0.1:"+cctx.String("api"), http.DefaultServeMux) + + srv := &http.Server{Addr: "127.0.0.1:" + cctx.String("api"), Handler: http.DefaultServeMux} + + sigChan := make(chan os.Signal, 2) + go func() { + <-sigChan + log.Warn("Shutting down..") + if err := stop(context.TODO()); err != nil { + log.Errorf("graceful shutting down failed: %s", err) + } + if err := srv.Shutdown(context.TODO()); err != nil { + log.Errorf("shutting down RPC server failed: %s", err) + } + log.Warn("Graceful shutdown successful") + }() + signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) + + return srv.ListenAndServe() }, } diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 2fc4e38b4..fa287ab77 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -65,7 +65,7 @@ var DaemonCmd = &cli.Command{ } var api api.FullNode - err = node.New(ctx, + stop, err := node.New(ctx, node.FullAPI(&api), node.Online(), @@ -86,6 +86,6 @@ var DaemonCmd = &cli.Command{ } // TODO: properly parse api endpoint (or make it a URL) - return serveRPC(api, "127.0.0.1:"+cctx.String("api")) + return serveRPC(api, stop, "127.0.0.1:"+cctx.String("api")) }, } diff --git a/cmd/lotus/main.go b/cmd/lotus/main.go index 3b6c194d8..894ca0b09 100644 --- a/cmd/lotus/main.go +++ b/cmd/lotus/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "log" "os" logging "github.com/ipfs/go-log" @@ -65,7 +64,7 @@ func main() { Code: trace.StatusCodeFailedPrecondition, Message: err.Error(), }) - log.Printf("%+v\n", err) + log.Warn(err) } return } diff --git a/cmd/lotus/rpc.go b/cmd/lotus/rpc.go index 68ac13c87..358e4cd3e 100644 --- a/cmd/lotus/rpc.go +++ b/cmd/lotus/rpc.go @@ -1,14 +1,22 @@ package main import ( + "context" "net/http" + "os" + "os/signal" + "syscall" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/lib/auth" "github.com/filecoin-project/go-lotus/lib/jsonrpc" + "github.com/filecoin-project/go-lotus/node" + logging "github.com/ipfs/go-log" ) -func serveRPC(a api.FullNode, addr string) error { +var log = logging.Logger("main") + +func serveRPC(a api.FullNode, stop node.StopFunc, addr string) error { rpcServer := jsonrpc.NewServer() rpcServer.Register("Filecoin", api.PermissionedFullAPI(a)) @@ -18,5 +26,20 @@ func serveRPC(a api.FullNode, addr string) error { } http.Handle("/rpc/v0", ah) - return http.ListenAndServe(addr, http.DefaultServeMux) + + srv := &http.Server{Addr: addr, Handler: http.DefaultServeMux} + + sigChan := make(chan os.Signal, 2) + go func() { + <-sigChan + if err := stop(context.TODO()); err != nil { + log.Errorf("graceful shutting down failed: %s", err) + } + if err := srv.Shutdown(context.TODO()); err != nil { + log.Errorf("shutting down RPC server failed: %s", err) + } + }() + signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) + + return srv.ListenAndServe() } diff --git a/lib/valctx/context.go b/lib/valctx/context.go new file mode 100644 index 000000000..82983cc6e --- /dev/null +++ b/lib/valctx/context.go @@ -0,0 +1,28 @@ +package valctx + +import ( + "context" + "time" +) + +type Context struct { + Parent context.Context +} + +func (c *Context) Deadline() (deadline time.Time, ok bool) { + return +} + +func (c *Context) Done() <-chan struct{} { + return nil +} + +func (c *Context) Err() error { + return nil +} + +func (c *Context) Value(key interface{}) interface{} { + return c.Parent.Value(key) +} + +var _ context.Context = &Context{} diff --git a/node/builder.go b/node/builder.go index 70e489285..db0ee64f0 100644 --- a/node/builder.go +++ b/node/builder.go @@ -337,8 +337,10 @@ func FullAPI(out *api.FullNode) Option { } } +type StopFunc func(context.Context) error + // New builds and starts new Filecoin node -func New(ctx context.Context, opts ...Option) error { +func New(ctx context.Context, opts ...Option) (StopFunc, error) { settings := Settings{ modules: map[interface{}]fx.Option{}, invokes: make([]fx.Option, _nInvokes), @@ -346,7 +348,7 @@ func New(ctx context.Context, opts ...Option) error { // apply module options in the right order if err := Options(Options(defaults()...), Options(opts...))(&settings); err != nil { - return err + return nil, err } // gather constructors for fx.Options @@ -374,10 +376,10 @@ func New(ctx context.Context, opts ...Option) error { // correctly if err := app.Start(ctx); err != nil { // comment fx.NopLogger few lines above for easier debugging - return err + return nil, err } - return nil + return app.Stop, nil } // In-memory / testing diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 830dcdf97..83e507e55 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -147,9 +147,13 @@ func RegisterMiner(lc fx.Lifecycle, ds dtypes.MetadataDS, api api.FullNode) erro lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - log.Infof("registering miner '%s' with full node", minerAddr) + log.Infof("Registering miner '%s' with full node", minerAddr) return api.MinerRegister(ctx, minerAddr) }, + OnStop: func(ctx context.Context) error { + log.Infof("Unregistering miner '%s' from full node", minerAddr) + return api.MinerUnregister(ctx, minerAddr) + }, }) return nil } From f2ecb772fa3a7d86e16ef3d9b514c0725c4f15f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 17 Sep 2019 16:34:22 +0200 Subject: [PATCH 5/8] repo: Close datastore in Close --- cmd/lotus/daemon.go | 7 +++---- lib/jsonrpc/client.go | 2 +- node/repo/fsrepo.go | 5 +++++ 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index fa287ab77..627a164f9 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -6,14 +6,13 @@ import ( "context" "io/ioutil" - "github.com/filecoin-project/go-lotus/api" - "github.com/filecoin-project/go-lotus/node/modules" - "github.com/filecoin-project/go-lotus/node/modules/testing" - "github.com/multiformats/go-multiaddr" "gopkg.in/urfave/cli.v2" + "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/node" + "github.com/filecoin-project/go-lotus/node/modules" + "github.com/filecoin-project/go-lotus/node/modules/testing" "github.com/filecoin-project/go-lotus/node/repo" ) diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index 73c09e0ca..628cbc8ad 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -57,7 +57,7 @@ type clientRequest struct { // ClientCloser is used to close Client from further use type ClientCloser func() -// NewClient creates new josnrpc 2.0 client +// NewClient creates new jsonrpc 2.0 client // // handler must be pointer to a struct with function fields // Returned value closes the client connection diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index abe232b60..ad3a43388 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -181,6 +181,11 @@ func (fsr *fsLockedRepo) Close() error { if err != nil && !os.IsNotExist(err) { return xerrors.Errorf("could not remove API file: %w", err) } + if fsr.ds != nil { + if err := fsr.ds.Close(); err != nil { + return xerrors.Errorf("could not close datastore: %w", err) + } + } err = fsr.closer.Close() fsr.closer = nil From 7f68fa95679bf670400cfe5c43c5c6a9ee68b35d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 17 Sep 2019 16:45:09 +0200 Subject: [PATCH 6/8] Fix tests --- chain/sync_test.go | 6 ++++-- node/node_test.go | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/chain/sync_test.go b/chain/sync_test.go index b81facd48..75247493d 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -127,7 +127,8 @@ func (tu *syncTestUtil) addSourceNode(gen int) { sourceRepo, genesis, blocks := tu.repoWithChain(tu.t, gen) var out api.FullNode - err := node.New(tu.ctx, + // TODO: Don't ignore stop + _, err := node.New(tu.ctx, node.FullAPI(&out), node.Online(), node.Repo(sourceRepo), @@ -149,7 +150,8 @@ func (tu *syncTestUtil) addClientNode() int { var out api.FullNode - err := node.New(tu.ctx, + // TODO: Don't ignore stop + _, err := node.New(tu.ctx, node.FullAPI(&out), node.Online(), node.Repo(repo.NewMemory(nil)), diff --git a/node/node_test.go b/node/node_test.go index f120d508e..72d577145 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -36,7 +36,8 @@ func builder(t *testing.T, n int) []api.FullNode { } var err error - err = node.New(ctx, + // TODO: Don't ignore stop + _, err = node.New(ctx, node.FullAPI(&out[i]), node.Online(), node.Repo(repo.NewMemory(nil)), From 037fa84e68c31e5f43c6bef2aa95745a03965a2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 17 Sep 2019 20:28:21 +0200 Subject: [PATCH 7/8] storageminer: Remove node shutdown hack --- cmd/lotus-storage-miner/run.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 63cd3a53f..3fb6ab191 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -38,13 +38,6 @@ var runCmd = &cli.Command{ } ctx := &valctx.Context{Parent: lcli.ReqContext(cctx)} - go func() { - // a hack for now to handle sigint - - <-ctx.Done() - os.Exit(0) - }() - v, err := nodeApi.Version(ctx) if err != nil { return err From 5e2c100f4d766c28b23da3b7e303c6606cd0c64f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 17 Sep 2019 20:36:06 +0200 Subject: [PATCH 8/8] storageminer: Better context handling --- cli/cmd.go | 22 +++++++++------------- cmd/lotus-storage-miner/run.go | 3 +-- lib/valctx/context.go | 28 ---------------------------- 3 files changed, 10 insertions(+), 43 deletions(-) delete mode 100644 lib/valctx/context.go diff --git a/cli/cmd.go b/cli/cmd.go index c869e30af..afeec86b9 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -21,7 +21,6 @@ var log = logging.Logger("cli") const ( metadataTraceConetxt = "traceContext" - metadataContext = "context" ) // ApiConnector returns API instance @@ -85,22 +84,19 @@ func GetStorageMinerAPI(ctx *cli.Context) (api.StorageMiner, error) { return client.NewStorageMinerRPC(addr, headers) } +func DaemonContext(cctx *cli.Context) context.Context { + if mtCtx, ok := cctx.App.Metadata[metadataTraceConetxt]; ok { + return mtCtx.(context.Context) + } + + return context.Background() +} + // ReqContext returns context for cli execution. Calling it for the first time // installs SIGTERM handler that will close returned context. // Not safe for concurrent execution. func ReqContext(cctx *cli.Context) context.Context { - if uctx, ok := cctx.App.Metadata[metadataContext]; ok { - // unchecked cast as if something else is in there - // it is crash worthy either way - return uctx.(context.Context) - } - var tCtx context.Context - - if mtCtx, ok := cctx.App.Metadata[metadataTraceConetxt]; ok { - tCtx = mtCtx.(context.Context) - } else { - tCtx = context.Background() - } + tCtx := DaemonContext(cctx) ctx, done := context.WithCancel(tCtx) sigChan := make(chan os.Signal, 2) diff --git a/cmd/lotus-storage-miner/run.go b/cmd/lotus-storage-miner/run.go index 3fb6ab191..026decf39 100644 --- a/cmd/lotus-storage-miner/run.go +++ b/cmd/lotus-storage-miner/run.go @@ -2,7 +2,6 @@ package main import ( "context" - "github.com/filecoin-project/go-lotus/lib/valctx" "net/http" "os" "os/signal" @@ -36,7 +35,7 @@ var runCmd = &cli.Command{ if err != nil { return err } - ctx := &valctx.Context{Parent: lcli.ReqContext(cctx)} + ctx := lcli.DaemonContext(cctx) v, err := nodeApi.Version(ctx) if err != nil { diff --git a/lib/valctx/context.go b/lib/valctx/context.go deleted file mode 100644 index 82983cc6e..000000000 --- a/lib/valctx/context.go +++ /dev/null @@ -1,28 +0,0 @@ -package valctx - -import ( - "context" - "time" -) - -type Context struct { - Parent context.Context -} - -func (c *Context) Deadline() (deadline time.Time, ok bool) { - return -} - -func (c *Context) Done() <-chan struct{} { - return nil -} - -func (c *Context) Err() error { - return nil -} - -func (c *Context) Value(key interface{}) interface{} { - return c.Parent.Value(key) -} - -var _ context.Context = &Context{}