diff --git a/chain/metrics/consensus.go b/chain/metrics/consensus.go new file mode 100644 index 000000000..b04d5cf28 --- /dev/null +++ b/chain/metrics/consensus.go @@ -0,0 +1,87 @@ +package metrics + +import ( + "context" + "encoding/json" + logging "github.com/ipfs/go-log" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "go.uber.org/fx" + + "github.com/filecoin-project/go-lotus/node/impl/full" + "github.com/filecoin-project/go-lotus/node/modules/helpers" +) + +var log = logging.Logger("metrics") + +const baseTopic = "/fil/headnotifs/" + +type Update struct { + Type string +} + +func SendHeadNotifs(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error { + ctx := helpers.LifecycleCtx(mctx, lc) + + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + gen, err := chain.Chain.GetGenesis() + if err != nil { + return err + } + + topic := baseTopic + gen.Cid().String() + + go func() { + if err := sendHeadNotifs(ctx, ps, topic, chain); err != nil { + log.Error("consensus metrics error", err) + return + } + }() + go func() { + sub, err := ps.Subscribe(topic) + if err != nil { + return + } + defer sub.Cancel() + + for { + if _, err := sub.Next(ctx); err != nil { + return + } + } + + }() + return nil + }, + }) + + return nil +} + +func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain full.ChainAPI) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + notifs, err := chain.ChainNotify(ctx) + if err != nil { + return err + } + + for { + select { + case notif := <-notifs: + n := notif[len(notif)-1] + + b, err := json.Marshal(n.Val) + if err != nil { + return err + } + + if err := ps.Publish(topic, b); err != nil { + return err + } + case <-ctx.Done(): + return nil + } + } +} diff --git a/chain/store/store.go b/chain/store/store.go index c8ba63527..5e9e17f03 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -45,6 +45,7 @@ type ChainStore struct { tstLk sync.Mutex tipsets map[uint64][]cid.Cid + reorgCh chan<- reorg headChangeNotifs []func(rev, app []*types.TipSet) error } @@ -56,6 +57,8 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore { tipsets: make(map[uint64][]cid.Cid), } + cs.reorgCh = cs.reorgWorker(context.TODO()) + hcnf := func(rev, app []*types.TipSet) error { cs.pubLk.Lock() defer cs.pubLk.Unlock() @@ -217,17 +220,46 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error { return nil } -func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error { - if cs.heaviest != nil { - revert, apply, err := cs.ReorgOps(cs.heaviest, ts) - if err != nil { - return errors.Wrap(err, "computing reorg ops failed") - } - for _, hcf := range cs.headChangeNotifs { - if err := hcf(revert, apply); err != nil { - return errors.Wrap(err, "head change func errored (BAD)") +type reorg struct { + old *types.TipSet + new *types.TipSet +} + +func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg { + out := make(chan reorg, 32) + go func() { + defer log.Warn("reorgWorker quit") + + for { + select { + case r := <-out: + revert, apply, err := cs.ReorgOps(r.old, r.new) + if err != nil { + log.Error("computing reorg ops failed: ", err) + continue + } + for _, hcf := range cs.headChangeNotifs { + if err := hcf(revert, apply); err != nil { + log.Error("head change func errored (BAD): ", err) + } + } + case <-ctx.Done(): + return } } + }() + return out +} + +func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error { + if cs.heaviest != nil { // buf + if len(cs.reorgCh) > 0 { + log.Warnf("Reorg channel running behind, %d reorgs buffered", len(cs.reorgCh)) + } + cs.reorgCh <- reorg{ + old: cs.heaviest, + new: ts, + } } else { log.Warn("no heaviest tipset found, using %s", ts.Cids()) } diff --git a/cmd/lotus-townhall/main.go b/cmd/lotus-townhall/main.go new file mode 100644 index 000000000..64b7f423c --- /dev/null +++ b/cmd/lotus-townhall/main.go @@ -0,0 +1,108 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/gorilla/websocket" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/peer" + pnet "github.com/libp2p/go-libp2p-pnet" + pubsub "github.com/libp2p/go-libp2p-pubsub" + + "github.com/filecoin-project/go-lotus/lib/addrutil" + "github.com/filecoin-project/go-lotus/node/modules/lp2p" +) + +const topic = "/fil/headnotifs/bafy2bzacedjqrkfbuafakygo6vlkrqozvsju2d5k6g24ry3mjjfxwrvet2636" + +var upgrader = websocket.Upgrader{ + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +func main() { + ctx := context.Background() + + protec, err := pnet.NewProtector(strings.NewReader(lp2p.LotusKey)) + if err != nil { + panic(err) + } + + host, err := libp2p.New( + ctx, + libp2p.Defaults, + libp2p.PrivateNetwork(protec), + ) + if err != nil { + panic(err) + } + ps, err := pubsub.NewGossipSub(ctx, host) + if err != nil { + panic(err) + } + + pi, err := addrutil.ParseAddresses(ctx, []string{ + "/ip4/147.75.80.29/tcp/1347/p2p/12D3KooWAShT7qd3oM7QPC8AsQffs6ThH69fZGui4xCW68E35rDP", + }) + if err != nil { + panic(err) + } + + if err := host.Connect(ctx, pi[0]); err != nil { + panic(err) + } + + http.HandleFunc("/sub", handler(ps)) + + fmt.Println("listening") + + if err := http.ListenAndServe("0.0.0.0:2975", nil); err != nil { + panic(err) + } +} + +type update struct { + From peer.ID + Update json.RawMessage +} + +func handler(ps *pubsub.PubSub) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + if r.Header.Get("Sec-WebSocket-Protocol") != "" { + w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol")) + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + + sub, err := ps.Subscribe(topic) + if err != nil { + return + } + + for { + msg, err := sub.Next(r.Context()) + if err != nil { + return + } + + fmt.Println(msg) + + if err := conn.WriteJSON(update{ + From: peer.ID(msg.From), + Update: msg.Data, + }); err != nil { + return + } + } + } +} diff --git a/cmd/lotus-townhall/townhall/.gitignore b/cmd/lotus-townhall/townhall/.gitignore new file mode 100644 index 000000000..4d29575de --- /dev/null +++ b/cmd/lotus-townhall/townhall/.gitignore @@ -0,0 +1,23 @@ +# See https://help.github.com/articles/ignoring-files/ for more about ignoring files. + +# dependencies +/node_modules +/.pnp +.pnp.js + +# testing +/coverage + +# production +/build + +# misc +.DS_Store +.env.local +.env.development.local +.env.test.local +.env.production.local + +npm-debug.log* +yarn-debug.log* +yarn-error.log* diff --git a/cmd/lotus-townhall/townhall/package.json b/cmd/lotus-townhall/townhall/package.json new file mode 100644 index 000000000..5a8167622 --- /dev/null +++ b/cmd/lotus-townhall/townhall/package.json @@ -0,0 +1,31 @@ +{ + "name": "townhall", + "version": "0.1.0", + "private": true, + "dependencies": { + "react": "^16.10.2", + "react-dom": "^16.10.2", + "react-scripts": "3.2.0" + }, + "scripts": { + "start": "react-scripts start", + "build": "react-scripts build", + "test": "react-scripts test", + "eject": "react-scripts eject" + }, + "eslintConfig": { + "extends": "react-app" + }, + "browserslist": { + "production": [ + ">0.2%", + "not dead", + "not op_mini all" + ], + "development": [ + "last 1 chrome version", + "last 1 firefox version", + "last 1 safari version" + ] + } +} diff --git a/cmd/lotus-townhall/townhall/public/index.html b/cmd/lotus-townhall/townhall/public/index.html new file mode 100644 index 000000000..38af10597 --- /dev/null +++ b/cmd/lotus-townhall/townhall/public/index.html @@ -0,0 +1,13 @@ + + +
+ + + +