Merge pull request #322 from filecoin-project/feat/townhall

Lotus Townhall
This commit is contained in:
Łukasz Magiera 2019-10-11 03:11:16 +02:00 committed by GitHub
commit 1c98501112
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 369 additions and 11 deletions

View File

@ -0,0 +1,87 @@
package metrics
import (
"context"
"encoding/json"
logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"github.com/filecoin-project/go-lotus/node/impl/full"
"github.com/filecoin-project/go-lotus/node/modules/helpers"
)
var log = logging.Logger("metrics")
const baseTopic = "/fil/headnotifs/"
type Update struct {
Type string
}
func SendHeadNotifs(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error {
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
gen, err := chain.Chain.GetGenesis()
if err != nil {
return err
}
topic := baseTopic + gen.Cid().String()
go func() {
if err := sendHeadNotifs(ctx, ps, topic, chain); err != nil {
log.Error("consensus metrics error", err)
return
}
}()
go func() {
sub, err := ps.Subscribe(topic)
if err != nil {
return
}
defer sub.Cancel()
for {
if _, err := sub.Next(ctx); err != nil {
return
}
}
}()
return nil
},
})
return nil
}
func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain full.ChainAPI) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
notifs, err := chain.ChainNotify(ctx)
if err != nil {
return err
}
for {
select {
case notif := <-notifs:
n := notif[len(notif)-1]
b, err := json.Marshal(n.Val)
if err != nil {
return err
}
if err := ps.Publish(topic, b); err != nil {
return err
}
case <-ctx.Done():
return nil
}
}
}

View File

@ -45,6 +45,7 @@ type ChainStore struct {
tstLk sync.Mutex
tipsets map[uint64][]cid.Cid
reorgCh chan<- reorg
headChangeNotifs []func(rev, app []*types.TipSet) error
}
@ -56,6 +57,8 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
tipsets: make(map[uint64][]cid.Cid),
}
cs.reorgCh = cs.reorgWorker(context.TODO())
hcnf := func(rev, app []*types.TipSet) error {
cs.pubLk.Lock()
defer cs.pubLk.Unlock()
@ -217,17 +220,46 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error {
return nil
}
func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
if cs.heaviest != nil {
revert, apply, err := cs.ReorgOps(cs.heaviest, ts)
if err != nil {
return errors.Wrap(err, "computing reorg ops failed")
}
for _, hcf := range cs.headChangeNotifs {
if err := hcf(revert, apply); err != nil {
return errors.Wrap(err, "head change func errored (BAD)")
type reorg struct {
old *types.TipSet
new *types.TipSet
}
func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg {
out := make(chan reorg, 32)
go func() {
defer log.Warn("reorgWorker quit")
for {
select {
case r := <-out:
revert, apply, err := cs.ReorgOps(r.old, r.new)
if err != nil {
log.Error("computing reorg ops failed: ", err)
continue
}
for _, hcf := range cs.headChangeNotifs {
if err := hcf(revert, apply); err != nil {
log.Error("head change func errored (BAD): ", err)
}
}
case <-ctx.Done():
return
}
}
}()
return out
}
func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
if cs.heaviest != nil { // buf
if len(cs.reorgCh) > 0 {
log.Warnf("Reorg channel running behind, %d reorgs buffered", len(cs.reorgCh))
}
cs.reorgCh <- reorg{
old: cs.heaviest,
new: ts,
}
} else {
log.Warn("no heaviest tipset found, using %s", ts.Cids())
}

108
cmd/lotus-townhall/main.go Normal file
View File

@ -0,0 +1,108 @@
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/gorilla/websocket"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/peer"
pnet "github.com/libp2p/go-libp2p-pnet"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/filecoin-project/go-lotus/lib/addrutil"
"github.com/filecoin-project/go-lotus/node/modules/lp2p"
)
const topic = "/fil/headnotifs/bafy2bzacedjqrkfbuafakygo6vlkrqozvsju2d5k6g24ry3mjjfxwrvet2636"
var upgrader = websocket.Upgrader{
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func main() {
ctx := context.Background()
protec, err := pnet.NewProtector(strings.NewReader(lp2p.LotusKey))
if err != nil {
panic(err)
}
host, err := libp2p.New(
ctx,
libp2p.Defaults,
libp2p.PrivateNetwork(protec),
)
if err != nil {
panic(err)
}
ps, err := pubsub.NewGossipSub(ctx, host)
if err != nil {
panic(err)
}
pi, err := addrutil.ParseAddresses(ctx, []string{
"/ip4/147.75.80.29/tcp/1347/p2p/12D3KooWAShT7qd3oM7QPC8AsQffs6ThH69fZGui4xCW68E35rDP",
})
if err != nil {
panic(err)
}
if err := host.Connect(ctx, pi[0]); err != nil {
panic(err)
}
http.HandleFunc("/sub", handler(ps))
fmt.Println("listening")
if err := http.ListenAndServe("0.0.0.0:2975", nil); err != nil {
panic(err)
}
}
type update struct {
From peer.ID
Update json.RawMessage
}
func handler(ps *pubsub.PubSub) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
if r.Header.Get("Sec-WebSocket-Protocol") != "" {
w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol"))
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
sub, err := ps.Subscribe(topic)
if err != nil {
return
}
for {
msg, err := sub.Next(r.Context())
if err != nil {
return
}
fmt.Println(msg)
if err := conn.WriteJSON(update{
From: peer.ID(msg.From),
Update: msg.Data,
}); err != nil {
return
}
}
}
}

23
cmd/lotus-townhall/townhall/.gitignore vendored Normal file
View File

@ -0,0 +1,23 @@
# See https://help.github.com/articles/ignoring-files/ for more about ignoring files.
# dependencies
/node_modules
/.pnp
.pnp.js
# testing
/coverage
# production
/build
# misc
.DS_Store
.env.local
.env.development.local
.env.test.local
.env.production.local
npm-debug.log*
yarn-debug.log*
yarn-error.log*

View File

@ -0,0 +1,31 @@
{
"name": "townhall",
"version": "0.1.0",
"private": true,
"dependencies": {
"react": "^16.10.2",
"react-dom": "^16.10.2",
"react-scripts": "3.2.0"
},
"scripts": {
"start": "react-scripts start",
"build": "react-scripts build",
"test": "react-scripts test",
"eject": "react-scripts eject"
},
"eslintConfig": {
"extends": "react-app"
},
"browserslist": {
"production": [
">0.2%",
"not dead",
"not op_mini all"
],
"development": [
"last 1 chrome version",
"last 1 firefox version",
"last 1 safari version"
]
}
}

View File

@ -0,0 +1,13 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="theme-color" content="#1a1a1a" />
<title>Lotus TownHall</title>
</head>
<body>
<noscript>You need to enable JavaScript to run this app.</noscript>
<div id="root"></div>
</body>
</html>

View File

@ -0,0 +1,2 @@
# https://www.robotstxt.org/robotstxt.html
User-agent: *

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,37 @@
import React from 'react';
import './App.css';
class App extends React.Component {
constructor(props) {
super(props);
//let ws = new WebSocket("ws://" + window.location.host + "/sub")
let ws = new WebSocket("ws://127.0.0.1:2975/sub")
ws.onmessage = (ev) => {
console.log(ev)
let update = JSON.parse(ev.data)
this.setState( prev => ({
...prev, [update.From]: update.Update,
}))
}
this.state = {}
}
render() {
let best = Object.keys(this.state).map(k => this.state[k]).reduce((p, n) => p > n.Height ? p : n.Height, -1)
console.log(best)
return Object.keys(this.state).map(k => [k, this.state[k]]).map(([k, v]) => {
let l = <span>{k} {v.Height}</span>
if(best !== v.Height) {
l = <span style={{color: '#f00'}}>{l}</span>
}
return <div>{l}</div>
})
}
}
export default App;

View File

@ -0,0 +1,9 @@
import React from 'react';
import ReactDOM from 'react-dom';
import App from './App';
it('renders without crashing', () => {
const div = document.createElement('div');
ReactDOM.render(<App />, div);
ReactDOM.unmountComponentAtNode(div);
});

View File

@ -0,0 +1,6 @@
body {
margin: 0;
font-family: monospace;
background: #1f1f1f;
color: #f0f0f0;
}

View File

@ -0,0 +1,6 @@
import React from 'react';
import ReactDOM from 'react-dom';
import './index.css';
import App from './App';
ReactDOM.render(<App />, document.getElementById('root'));

View File

@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/deals"
"github.com/filecoin-project/go-lotus/chain/metrics"
"github.com/filecoin-project/go-lotus/chain/stmgr"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
@ -90,6 +91,7 @@ const (
// daemon
ExtractApiKey
HeadMetricsKey
SetApiEndpointKey
@ -225,6 +227,7 @@ func Online() Option {
Override(RunHelloKey, modules.RunHello),
Override(RunBlockSyncKey, modules.RunBlockSync),
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
Override(HeadMetricsKey, metrics.SendHeadNotifs),
Override(new(*discovery.Local), discovery.NewLocal),
Override(new(discovery.PeerResolver), modules.RetrievalResolver),

View File

@ -8,12 +8,12 @@ import (
pnet "github.com/libp2p/go-libp2p-pnet"
)
var lotusKey = "/key/swarm/psk/1.0.0/\n/base16/\n20c72398e6299c7bbc1b501fdcc8abe4f89f798e9b93b2d2bc02e3c29b6a088e"
var LotusKey = "/key/swarm/psk/1.0.0/\n/base16/\n20c72398e6299c7bbc1b501fdcc8abe4f89f798e9b93b2d2bc02e3c29b6a088e"
type PNetFingerprint []byte
func PNet() (opts Libp2pOpts, fp PNetFingerprint, err error) {
protec, err := pnet.NewProtector(strings.NewReader(lotusKey))
protec, err := pnet.NewProtector(strings.NewReader(LotusKey))
if err != nil {
return opts, nil, fmt.Errorf("failed to configure private network: %s", err)
}