diff --git a/api/api.go b/api/api.go index 1ea58cfdf..1dff49e13 100644 --- a/api/api.go +++ b/api/api.go @@ -54,12 +54,14 @@ type FullNode interface { ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error // TODO: check serialization ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) + ChainGetTipSet(context.Context, []cid.Cid) (*types.TipSet, error) ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error) ChainGetParentReceipts(context.Context, cid.Cid) ([]*types.MessageReceipt, error) ChainGetParentMessages(context.Context, cid.Cid) ([]Message, error) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) ChainReadObj(context.Context, cid.Cid) ([]byte, error) ChainSetHead(context.Context, *types.TipSet) error + ChainGetGenesis(context.Context) (*types.TipSet, error) // syncer SyncState(context.Context) (*SyncState, error) diff --git a/api/struct.go b/api/struct.go index ab61ef39e..1a6068dc4 100644 --- a/api/struct.go +++ b/api/struct.go @@ -42,12 +42,14 @@ type FullNodeStruct struct { ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"` ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"` ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"` + ChainGetTipSet func(context.Context, []cid.Cid) (*types.TipSet, error) `perm:"read"` ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error) `perm:"read"` ChainGetParentReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"` ChainGetParentMessages func(context.Context, cid.Cid) ([]Message, error) `perm:"read"` ChainGetTipSetByHeight func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) `perm:"read"` ChainReadObj func(context.Context, cid.Cid) ([]byte, error) `perm:"read"` ChainSetHead func(context.Context, *types.TipSet) error `perm:"admin"` + ChainGetGenesis func(context.Context) (*types.TipSet, error) `perm:"read"` SyncState func(context.Context) (*SyncState, error) `perm:"read"` @@ -284,6 +286,10 @@ func (c *FullNodeStruct) ChainGetBlock(ctx context.Context, b cid.Cid) (*types.B return c.Internal.ChainGetBlock(ctx, b) } +func (c *FullNodeStruct) ChainGetTipSet(ctx context.Context, cids []cid.Cid) (*types.TipSet, error) { + return c.Internal.ChainGetTipSet(ctx, cids) +} + func (c *FullNodeStruct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) (*BlockMessages, error) { return c.Internal.ChainGetBlockMessages(ctx, b) } @@ -308,6 +314,10 @@ func (c *FullNodeStruct) ChainSetHead(ctx context.Context, ts *types.TipSet) err return c.Internal.ChainSetHead(ctx, ts) } +func (c *FullNodeStruct) ChainGetGenesis(ctx context.Context) (*types.TipSet, error) { + return c.Internal.ChainGetGenesis(ctx) +} + func (c *FullNodeStruct) SyncState(ctx context.Context) (*SyncState, error) { return c.Internal.SyncState(ctx) } diff --git a/chain/gen/mining.go b/chain/gen/mining.go index 0aa83c24c..bead86d52 100644 --- a/chain/gen/mining.go +++ b/chain/gen/mining.go @@ -20,7 +20,7 @@ import ( ) func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w *wallet.Wallet, miner address.Address, parents *types.TipSet, tickets []*types.Ticket, proof types.ElectionProof, msgs []*types.SignedMessage, timestamp uint64) (*types.FullBlock, error) { - st, recpts, err := sm.TipSetState(parents) + st, recpts, err := sm.TipSetState(ctx, parents) if err != nil { return nil, errors.Wrap(err, "failed to load tipset state") } diff --git a/chain/metrics/consensus.go b/chain/metrics/consensus.go new file mode 100644 index 000000000..297a28f28 --- /dev/null +++ b/chain/metrics/consensus.go @@ -0,0 +1,112 @@ +package metrics + +import ( + "context" + "encoding/json" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "go.uber.org/fx" + + "github.com/filecoin-project/go-lotus/chain/types" + "github.com/filecoin-project/go-lotus/node/impl/full" + "github.com/filecoin-project/go-lotus/node/modules/helpers" +) + +var log = logging.Logger("metrics") + +const baseTopic = "/fil/headnotifs/" + +type Update struct { + Type string +} + +func SendHeadNotifs(nickname string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error { + ctx := helpers.LifecycleCtx(mctx, lc) + + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + gen, err := chain.Chain.GetGenesis() + if err != nil { + return err + } + + topic := baseTopic + gen.Cid().String() + + go func() { + if err := sendHeadNotifs(ctx, ps, topic, chain, nickname); err != nil { + log.Error("consensus metrics error", err) + return + } + }() + go func() { + sub, err := ps.Subscribe(topic) + if err != nil { + return + } + defer sub.Cancel() + + for { + if _, err := sub.Next(ctx); err != nil { + return + } + } + + }() + return nil + }, + }) + + return nil + } +} + +type message struct { + // TipSet + Cids []cid.Cid + Blocks []*types.BlockHeader + Height uint64 + Weight types.BigInt + + // Meta + + NodeName string +} + +func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain full.ChainAPI, nickname string) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + notifs, err := chain.ChainNotify(ctx) + if err != nil { + return err + } + + for { + select { + case notif := <-notifs: + n := notif[len(notif)-1] + + m := message{ + Cids: n.Val.Cids(), + Blocks: n.Val.Blocks(), + Height: n.Val.Height(), + Weight: n.Val.Weight(), + NodeName: nickname, + } + + b, err := json.Marshal(m) + if err != nil { + return err + } + + if err := ps.Publish(topic, b); err != nil { + return err + } + case <-ctx.Done(): + return nil + } + } +} diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 6405482f0..9a70cbd0a 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -19,6 +19,7 @@ import ( "github.com/ipfs/go-cid" hamt "github.com/ipfs/go-hamt-ipld" logging "github.com/ipfs/go-log" + "go.opencensus.io/trace" ) var log = logging.Logger("statemgr") @@ -45,8 +46,7 @@ func cidsToKey(cids []cid.Cid) string { return out } -func (sm *StateManager) TipSetState(ts *types.TipSet) (cid.Cid, cid.Cid, error) { - ctx := context.TODO() +func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) { ck := cidsToKey(ts.Cids()) sm.stlk.Lock() @@ -76,6 +76,9 @@ func (sm *StateManager) TipSetState(ts *types.TipSet) (cid.Cid, cid.Cid, error) } func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.BlockHeader, cb func(cid.Cid, *types.Message, *vm.ApplyRet) error) (cid.Cid, cid.Cid, error) { + ctx, span := trace.StartSpan(ctx, "computeTipSetState") + defer span.End() + pstate := blks[0].ParentStateRoot cids := make([]cid.Cid, len(blks)) @@ -248,7 +251,7 @@ func (sm *StateManager) ResolveToKeyAddress(ctx context.Context, addr address.Ad ts = sm.cs.GetHeaviestTipSet() } - st, _, err := sm.TipSetState(ts) + st, _, err := sm.TipSetState(ctx, ts) if err != nil { return address.Undef, xerrors.Errorf("resolve address failed to get tipset state: %w", err) } diff --git a/chain/store/store.go b/chain/store/store.go index c8ba63527..ba710ee63 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -45,6 +45,7 @@ type ChainStore struct { tstLk sync.Mutex tipsets map[uint64][]cid.Cid + reorgCh chan<- reorg headChangeNotifs []func(rev, app []*types.TipSet) error } @@ -56,6 +57,8 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore { tipsets: make(map[uint64][]cid.Cid), } + cs.reorgCh = cs.reorgWorker(context.TODO()) + hcnf := func(rev, app []*types.TipSet) error { cs.pubLk.Lock() defer cs.pubLk.Unlock() @@ -217,17 +220,46 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error { return nil } -func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error { - if cs.heaviest != nil { - revert, apply, err := cs.ReorgOps(cs.heaviest, ts) - if err != nil { - return errors.Wrap(err, "computing reorg ops failed") - } - for _, hcf := range cs.headChangeNotifs { - if err := hcf(revert, apply); err != nil { - return errors.Wrap(err, "head change func errored (BAD)") +type reorg struct { + old *types.TipSet + new *types.TipSet +} + +func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg { + out := make(chan reorg, 32) + go func() { + defer log.Warn("reorgWorker quit") + + for { + select { + case r := <-out: + revert, apply, err := cs.ReorgOps(r.old, r.new) + if err != nil { + log.Error("computing reorg ops failed: ", err) + continue + } + for _, hcf := range cs.headChangeNotifs { + if err := hcf(revert, apply); err != nil { + log.Error("head change func errored (BAD): ", err) + } + } + case <-ctx.Done(): + return } } + }() + return out +} + +func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error { + if cs.heaviest != nil { // buf + if len(cs.reorgCh) > 0 { + log.Warnf("Reorg channel running behind, %d reorgs buffered", len(cs.reorgCh)) + } + cs.reorgCh <- reorg{ + old: cs.heaviest, + new: ts, + } } else { log.Warn("no heaviest tipset found, using %s", ts.Cids()) } @@ -693,7 +725,7 @@ func (cs *ChainStore) GetRandomness(ctx context.Context, blks []cid.Cid, tickets } lt := int64(len(tickets)) if lb < lt { - log.Desugar().Warn("self sampling randomness. this should be extremely rare, if you see this often it may be a bug", zap.Stack("call-stack")) + log.Desugar().Warn("self sampling randomness. this should be extremely rare, if you see this often it may be a bug", zap.Stack("stacktrace")) t := tickets[lt-(1+lb)] diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index b7fd6d700..ab03c999f 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -44,7 +44,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha return } - log.Infof("inform new block over pubsub: %s from %s", blk.Header.Cid(), msg.GetFrom()) + log.Infow("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom()) s.InformNewBlock(msg.GetFrom(), &types.FullBlock{ Header: blk.Header, BlsMessages: bmsgs, diff --git a/chain/sync.go b/chain/sync.go index db7da3e6b..4ff9ad54f 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -24,6 +24,7 @@ import ( logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/peer" cbg "github.com/whyrusleeping/cbor-gen" + "go.opencensus.io/trace" "golang.org/x/xerrors" ) @@ -87,6 +88,7 @@ const BootstrapPeerThreshold = 1 // This should be called when connecting to new peers, and additionally // when receiving new blocks from the network func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { + ctx := context.Background() if fts == nil { panic("bad") } @@ -102,7 +104,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { // TODO: this is kindof a hack... log.Info("got block from ourselves") - if err := syncer.Sync(fts.TipSet()); err != nil { + if err := syncer.Sync(ctx, fts.TipSet()); err != nil { log.Errorf("failed to sync our own block %s: %+v", fts.TipSet().Cids(), err) } @@ -114,7 +116,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) { syncer.Bsync.AddPeer(from) go func() { - if err := syncer.Sync(fts.TipSet()); err != nil { + if err := syncer.Sync(ctx, fts.TipSet()); err != nil { log.Errorf("sync error: %+v", err) } }() @@ -327,9 +329,10 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro return fts, nil } -func (syncer *Syncer) Sync(maybeHead *types.TipSet) error { +func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { + ctx, span := trace.StartSpan(ctx, "chain.Sync") + defer span.End() - ctx := context.TODO() syncer.syncLock.Lock() defer syncer.syncLock.Unlock() @@ -420,6 +423,7 @@ func (syncer *Syncer) validateTickets(ctx context.Context, mworker address.Addre // Should match up with 'Semantical Validation' in validation.md in the spec func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) error { + h := b.Header baseTs, err := syncer.store.LoadTipSet(h.Parents) @@ -427,7 +431,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err return xerrors.Errorf("load parent tipset failed (%s): %w", h.Parents, err) } - stateroot, precp, err := syncer.sm.TipSetState(baseTs) + stateroot, precp, err := syncer.sm.TipSetState(ctx, baseTs) if err != nil { return xerrors.Errorf("get tipsetstate(%d, %s) failed: %w", h.Height, h.Parents, err) } @@ -594,7 +598,7 @@ func (syncer *Syncer) verifyBlsAggregate(sig types.Signature, msgs []cid.Cid, pu var bsig bls.Signature copy(bsig[:], sig.Data) - if !bls.Verify(bsig, digests, pubks) { + if !bls.Verify(&bsig, digests, pubks) { return xerrors.New("bls aggregate signature failed to verify") } @@ -720,11 +724,11 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type return nil, xerrors.Errorf("fork was longer than our threshold") } -func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error { +func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error { syncer.syncState.SetHeight(0) return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error { log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids())) - if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil { + if err := syncer.ValidateTipSet(ctx, fts); err != nil { log.Errorf("failed to validate tipset: %+v", err) return xerrors.Errorf("message processing failed: %w", err) } @@ -847,11 +851,12 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error syncer.syncState.SetStage(api.StageMessages) - if err := syncer.syncMessagesAndCheckState(headers); err != nil { + if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil { return xerrors.Errorf("collectChain syncMessages: %w", err) } syncer.syncState.SetStage(api.StageSyncComplete) + log.Infow("new tipset", "height", ts.Height(), "tipset", types.LogCids(ts.Cids())) return nil } diff --git a/chain/types/logs.go b/chain/types/logs.go new file mode 100644 index 000000000..e9a69e9d5 --- /dev/null +++ b/chain/types/logs.go @@ -0,0 +1,17 @@ +package types + +import ( + "github.com/ipfs/go-cid" + "go.uber.org/zap/zapcore" +) + +type LogCids []cid.Cid + +var _ zapcore.ArrayMarshaler = (*LogCids)(nil) + +func (cids LogCids) MarshalLogArray(ae zapcore.ArrayEncoder) error { + for _, c := range cids { + ae.AppendString(c.String()) + } + return nil +} diff --git a/chain/types/signature_cgo.go b/chain/types/signature_cgo.go index e811a0f24..5d60151a6 100644 --- a/chain/types/signature_cgo.go +++ b/chain/types/signature_cgo.go @@ -44,7 +44,7 @@ func (s *Signature) Verify(addr address.Address, msg []byte) error { var sig bls.Signature copy(sig[:], s.Data) - if !bls.Verify(sig, digests, pubkeys) { + if !bls.Verify(&sig, digests, pubkeys) { return fmt.Errorf("bls signature failed to verify") } diff --git a/chain/vm/vm.go b/chain/vm/vm.go index dc6deae20..3498c9f69 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -430,6 +430,13 @@ func checkMessage(msg *types.Message) error { func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage") defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes( + trace.StringAttribute("to", msg.To.String()), + trace.Int64Attribute("method", int64(msg.Method)), + trace.StringAttribute("value", msg.Value.String()), + ) + } if err := checkMessage(msg); err != nil { return nil, err @@ -605,6 +612,7 @@ func (vm *VM) SetBlockHeight(h uint64) { func (vm *VM) Invoke(act *types.Actor, vmctx *VMContext, method uint64, params []byte) ([]byte, aerrors.ActorError) { ctx, span := trace.StartSpan(vmctx.ctx, "vm.Invoke") defer span.End() + var oldCtx context.Context oldCtx, vmctx.ctx = vmctx.ctx, ctx defer func() { diff --git a/cli/chain.go b/cli/chain.go index 13e99ff95..e16fadef7 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -22,6 +22,7 @@ var chainCmd = &cli.Command{ chainReadObjCmd, chainGetMsgCmd, chainSetHeadCmd, + chainListCmd, }, } @@ -210,6 +211,12 @@ var chainGetMsgCmd = &cli.Command{ var chainSetHeadCmd = &cli.Command{ Name: "sethead", Usage: "manually set the local nodes head tipset (Caution: normally only used for recovery)", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "genesis", + Usage: "reset head to genesis", + }, + }, Action: func(cctx *cli.Context) error { api, closer, err := GetFullNodeAPI(cctx) if err != nil { @@ -218,13 +225,25 @@ var chainSetHeadCmd = &cli.Command{ defer closer() ctx := ReqContext(cctx) - if !cctx.Args().Present() { + gen := cctx.Bool("genesis") + + if !cctx.Args().Present() && !gen { return fmt.Errorf("must pass cids for tipset to set as head") } - ts, err := parseTipSet(api, ctx, cctx.Args().Slice()) - if err != nil { - return err + var ts *types.TipSet + if gen { + gents, err := api.ChainGetGenesis(ctx) + if err != nil { + return err + } + ts = gents + } else { + parsedts, err := parseTipSet(api, ctx, cctx.Args().Slice()) + if err != nil { + return err + } + ts = parsedts } if err := api.ChainSetHead(ctx, ts); err != nil { @@ -253,3 +272,46 @@ func parseTipSet(api api.FullNode, ctx context.Context, vals []string) (*types.T return types.NewTipSet(headers) } + +var chainListCmd = &cli.Command{ + Name: "list", + Usage: "View a segment of the chain", + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + head, err := api.ChainHead(ctx) + if err != nil { + return err + } + + tss := []*types.TipSet{head} + cur := head + for i := 1; i < 30; i++ { + if cur.Height() == 0 { + break + } + + next, err := api.ChainGetTipSet(ctx, cur.Parents()) + if err != nil { + return err + } + + tss = append(tss, next) + cur = next + } + + for i := len(tss) - 1; i >= 0; i-- { + fmt.Printf("%d [ ", tss[i].Height()) + for _, b := range tss[i].Blocks() { + fmt.Printf("%s: %s,", b.Cid(), b.Miner) + } + fmt.Println("]") + } + return nil + }, +} diff --git a/cmd/lotus-townhall/main.go b/cmd/lotus-townhall/main.go new file mode 100644 index 000000000..64b7f423c --- /dev/null +++ b/cmd/lotus-townhall/main.go @@ -0,0 +1,108 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/gorilla/websocket" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/peer" + pnet "github.com/libp2p/go-libp2p-pnet" + pubsub "github.com/libp2p/go-libp2p-pubsub" + + "github.com/filecoin-project/go-lotus/lib/addrutil" + "github.com/filecoin-project/go-lotus/node/modules/lp2p" +) + +const topic = "/fil/headnotifs/bafy2bzacedjqrkfbuafakygo6vlkrqozvsju2d5k6g24ry3mjjfxwrvet2636" + +var upgrader = websocket.Upgrader{ + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +func main() { + ctx := context.Background() + + protec, err := pnet.NewProtector(strings.NewReader(lp2p.LotusKey)) + if err != nil { + panic(err) + } + + host, err := libp2p.New( + ctx, + libp2p.Defaults, + libp2p.PrivateNetwork(protec), + ) + if err != nil { + panic(err) + } + ps, err := pubsub.NewGossipSub(ctx, host) + if err != nil { + panic(err) + } + + pi, err := addrutil.ParseAddresses(ctx, []string{ + "/ip4/147.75.80.29/tcp/1347/p2p/12D3KooWAShT7qd3oM7QPC8AsQffs6ThH69fZGui4xCW68E35rDP", + }) + if err != nil { + panic(err) + } + + if err := host.Connect(ctx, pi[0]); err != nil { + panic(err) + } + + http.HandleFunc("/sub", handler(ps)) + + fmt.Println("listening") + + if err := http.ListenAndServe("0.0.0.0:2975", nil); err != nil { + panic(err) + } +} + +type update struct { + From peer.ID + Update json.RawMessage +} + +func handler(ps *pubsub.PubSub) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + if r.Header.Get("Sec-WebSocket-Protocol") != "" { + w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol")) + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + + sub, err := ps.Subscribe(topic) + if err != nil { + return + } + + for { + msg, err := sub.Next(r.Context()) + if err != nil { + return + } + + fmt.Println(msg) + + if err := conn.WriteJSON(update{ + From: peer.ID(msg.From), + Update: msg.Data, + }); err != nil { + return + } + } + } +} diff --git a/cmd/lotus-townhall/townhall/.gitignore b/cmd/lotus-townhall/townhall/.gitignore new file mode 100644 index 000000000..4d29575de --- /dev/null +++ b/cmd/lotus-townhall/townhall/.gitignore @@ -0,0 +1,23 @@ +# See https://help.github.com/articles/ignoring-files/ for more about ignoring files. + +# dependencies +/node_modules +/.pnp +.pnp.js + +# testing +/coverage + +# production +/build + +# misc +.DS_Store +.env.local +.env.development.local +.env.test.local +.env.production.local + +npm-debug.log* +yarn-debug.log* +yarn-error.log* diff --git a/cmd/lotus-townhall/townhall/package.json b/cmd/lotus-townhall/townhall/package.json new file mode 100644 index 000000000..5a8167622 --- /dev/null +++ b/cmd/lotus-townhall/townhall/package.json @@ -0,0 +1,31 @@ +{ + "name": "townhall", + "version": "0.1.0", + "private": true, + "dependencies": { + "react": "^16.10.2", + "react-dom": "^16.10.2", + "react-scripts": "3.2.0" + }, + "scripts": { + "start": "react-scripts start", + "build": "react-scripts build", + "test": "react-scripts test", + "eject": "react-scripts eject" + }, + "eslintConfig": { + "extends": "react-app" + }, + "browserslist": { + "production": [ + ">0.2%", + "not dead", + "not op_mini all" + ], + "development": [ + "last 1 chrome version", + "last 1 firefox version", + "last 1 safari version" + ] + } +} diff --git a/cmd/lotus-townhall/townhall/public/index.html b/cmd/lotus-townhall/townhall/public/index.html new file mode 100644 index 000000000..38af10597 --- /dev/null +++ b/cmd/lotus-townhall/townhall/public/index.html @@ -0,0 +1,13 @@ + + + + + + + Lotus TownHall + + + +
+ + diff --git a/cmd/lotus-townhall/townhall/public/robots.txt b/cmd/lotus-townhall/townhall/public/robots.txt new file mode 100644 index 000000000..01b0f9a10 --- /dev/null +++ b/cmd/lotus-townhall/townhall/public/robots.txt @@ -0,0 +1,2 @@ +# https://www.robotstxt.org/robotstxt.html +User-agent: * diff --git a/cmd/lotus-townhall/townhall/src/App.css b/cmd/lotus-townhall/townhall/src/App.css new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/cmd/lotus-townhall/townhall/src/App.css @@ -0,0 +1 @@ + diff --git a/cmd/lotus-townhall/townhall/src/App.js b/cmd/lotus-townhall/townhall/src/App.js new file mode 100644 index 000000000..426588d60 --- /dev/null +++ b/cmd/lotus-townhall/townhall/src/App.js @@ -0,0 +1,40 @@ +import React from 'react'; +import './App.css'; + +class App extends React.Component { + constructor(props) { + super(props); + + //let ws = new WebSocket("ws://" + window.location.host + "/sub") + let ws = new WebSocket("ws://127.0.0.1:2975/sub") + + ws.onmessage = (ev) => { + console.log(ev) + let update = JSON.parse(ev.data) + + this.setState( prev => ({ + ...prev, [update.From]: update.Update, + })) + } + + this.state = {} + } + + render() { + let best = Object.keys(this.state).map(k => this.state[k]).reduce((p, n) => p > n.Height ? p : n.Height, -1) + console.log(best) + + return {Object.keys(this.state).map(k => [k, this.state[k]]).map(([k, v]) => { + let l = [, , ] + if (best !== v.Height) { + l = {l} + } else { + l = {l} + } + + return l + }) + }
{k}{v.NodeName}{v.Height}
+ } +} +export default App; diff --git a/cmd/lotus-townhall/townhall/src/App.test.js b/cmd/lotus-townhall/townhall/src/App.test.js new file mode 100644 index 000000000..a754b201b --- /dev/null +++ b/cmd/lotus-townhall/townhall/src/App.test.js @@ -0,0 +1,9 @@ +import React from 'react'; +import ReactDOM from 'react-dom'; +import App from './App'; + +it('renders without crashing', () => { + const div = document.createElement('div'); + ReactDOM.render(, div); + ReactDOM.unmountComponentAtNode(div); +}); diff --git a/cmd/lotus-townhall/townhall/src/index.css b/cmd/lotus-townhall/townhall/src/index.css new file mode 100644 index 000000000..fb0d9d10e --- /dev/null +++ b/cmd/lotus-townhall/townhall/src/index.css @@ -0,0 +1,6 @@ +body { + margin: 0; + font-family: monospace; + background: #1f1f1f; + color: #f0f0f0; +} diff --git a/cmd/lotus-townhall/townhall/src/index.js b/cmd/lotus-townhall/townhall/src/index.js new file mode 100644 index 000000000..395b74997 --- /dev/null +++ b/cmd/lotus-townhall/townhall/src/index.js @@ -0,0 +1,6 @@ +import React from 'react'; +import ReactDOM from 'react-dom'; +import './index.css'; +import App from './App'; + +ReactDOM.render(, document.getElementById('root')); diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index a65c5ae39..075de8b39 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -94,37 +94,16 @@ var DaemonCmd = &cli.Command{ } return lr.SetAPIEndpoint(apima) }), + + node.ApplyIf(func(s *node.Settings) bool { return cctx.Bool("bootstrap") }, + node.Override(node.BootstrapKey, modules.Bootstrap), + ), ) if err != nil { return err } - go func() { - if !cctx.Bool("bootstrap") { - return - } - err := bootstrap(ctx, api) - if err != nil { - log.Error("Bootstrap failed: ", err) - } - }() - // TODO: properly parse api endpoint (or make it a URL) return serveRPC(api, stop, "127.0.0.1:"+cctx.String("api")) }, } - -func bootstrap(ctx context.Context, api api.FullNode) error { - pis, err := build.BuiltinBootstrap() - if err != nil { - return err - } - - for _, pi := range pis { - if err := api.NetConnect(ctx, pi); err != nil { - return err - } - } - - return nil -} diff --git a/extern/go-bls-sigs b/extern/go-bls-sigs index 03705e06e..c221eb016 160000 --- a/extern/go-bls-sigs +++ b/extern/go-bls-sigs @@ -1 +1 @@ -Subproject commit 03705e06e83ac0d4c98695dacd0f20a350cc93d7 +Subproject commit c221eb016ab7074465f444fb592c5184e2df3926 diff --git a/go.mod b/go.mod index 807508321..8ab84c6fd 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipld-cbor v0.0.3 github.com/ipfs/go-ipld-format v0.0.2 - github.com/ipfs/go-log v0.0.2-0.20190905183954-62f287c7db59 + github.com/ipfs/go-log v0.0.2-0.20190920042044-a609c1ae5144 github.com/ipfs/go-merkledag v0.2.3 github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 diff --git a/go.sum b/go.sum index abbe9d439..4d000e1a4 100644 --- a/go.sum +++ b/go.sum @@ -254,8 +254,8 @@ github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dC github.com/ipfs/go-ipld-format v0.0.2 h1:OVAGlyYT6JPZ0pEfGntFPS40lfrDmaDbQwNHEY2G9Zs= github.com/ipfs/go-ipld-format v0.0.2/go.mod h1:4B6+FM2u9OJ9zCV+kSbgFAZlOrv1Hqbf0INGQgiKf9k= github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM= -github.com/ipfs/go-log v0.0.2-0.20190905183954-62f287c7db59 h1:gUmFTK1r79usWs8LQJsoMw+yba2qs/EVj9kmPRvSrYM= -github.com/ipfs/go-log v0.0.2-0.20190905183954-62f287c7db59/go.mod h1:azGN5dH7ailfREknDDNYB0Eq4qZ/4I4Y3gO0ivjJNyM= +github.com/ipfs/go-log v0.0.2-0.20190920042044-a609c1ae5144 h1:5WM8S1nwquWQ3zEuNhK82NE5Di6Pd41qz9JxxvxTAIA= +github.com/ipfs/go-log v0.0.2-0.20190920042044-a609c1ae5144/go.mod h1:azGN5dH7ailfREknDDNYB0Eq4qZ/4I4Y3gO0ivjJNyM= github.com/ipfs/go-merkledag v0.1.0/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= github.com/ipfs/go-merkledag v0.2.3 h1:aMdkK9G1hEeNvn3VXfiEMLY0iJnbiQQUHnM0HFJREsE= github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= diff --git a/miner/miner.go b/miner/miner.go index 1a7a6c3c7..9ef74787d 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -219,7 +219,7 @@ func (m *Miner) GetBestMiningCandidate() (*MiningBase, error) { } func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) { - log.Info("attempting to mine a block on:", base.ts.Cids()) + log.Infow("attempting to mine a block", "tipset", types.LogCids(base.ts.Cids())) ticket, err := m.scratchTicket(ctx, base) if err != nil { return nil, errors.Wrap(err, "scratching ticket failed") @@ -239,7 +239,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, if err != nil { return nil, errors.Wrap(err, "failed to create block") } - log.Infof("mined new block: %s", b.Cid()) + log.Infow("mined new block", "cid", b.Cid()) return b, nil } diff --git a/node/builder.go b/node/builder.go index 67c91345c..54e75c6c7 100644 --- a/node/builder.go +++ b/node/builder.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/deals" + "github.com/filecoin-project/go-lotus/chain/metrics" "github.com/filecoin-project/go-lotus/chain/stmgr" "github.com/filecoin-project/go-lotus/chain/store" "github.com/filecoin-project/go-lotus/chain/types" @@ -70,6 +71,7 @@ const ( PstoreAddSelfKeysKey = invoke(iota) StartListeningKey + BootstrapKey // filecoin SetGenesisKey @@ -90,6 +92,7 @@ const ( // daemon ExtractApiKey + HeadMetricsKey SetApiEndpointKey @@ -225,6 +228,7 @@ func Online() Option { Override(RunHelloKey, modules.RunHello), Override(RunBlockSyncKey, modules.RunBlockSync), Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), + Override(HeadMetricsKey, metrics.SendHeadNotifs("")), Override(new(*discovery.Local), discovery.NewLocal), Override(new(discovery.PeerResolver), modules.RetrievalResolver), @@ -289,6 +293,10 @@ func Config(cfg *config.Root) Option { ApplyIf(func(s *Settings) bool { return s.Online }, Override(StartListeningKey, lp2p.StartListening(cfg.Libp2p.ListenAddresses)), + + ApplyIf(func(s *Settings) bool { return s.nodeType == nodeFull }, + Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)), + ), ), ) } diff --git a/node/config/def.go b/node/config/def.go index b7bf7a21f..11480a593 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -6,6 +6,8 @@ import "time" type Root struct { API API Libp2p Libp2p + + Metrics Metrics } // API contains configs for API endpoint @@ -19,6 +21,10 @@ type Libp2p struct { ListenAddresses []string } +type Metrics struct { + Nickname string +} + // Default returns the default config func Default() *Root { def := Root{ diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 0d68d22cb..748cb3ad6 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -50,6 +50,10 @@ func (a *ChainAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.Block return a.Chain.GetBlock(msg) } +func (a *ChainAPI) ChainGetTipSet(ctx context.Context, cids []cid.Cid) (*types.TipSet, error) { + return a.Chain.LoadTipSet(cids) +} + func (a *ChainAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) { b, err := a.Chain.GetBlock(msg) if err != nil { @@ -161,3 +165,12 @@ func (a *ChainAPI) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error func (a *ChainAPI) ChainSetHead(ctx context.Context, ts *types.TipSet) error { return a.Chain.SetHead(ts) } + +func (a *ChainAPI) ChainGetGenesis(ctx context.Context) (*types.TipSet, error) { + genb, err := a.Chain.GetGenesis() + if err != nil { + return nil, err + } + + return types.NewTipSet([]*types.BlockHeader{genb}) +} diff --git a/node/impl/full/state.go b/node/impl/full/state.go index efa6eeebd..efb946e3d 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -129,12 +129,12 @@ func (a *StateAPI) StateReplay(ctx context.Context, ts *types.TipSet, mc cid.Cid }, nil } -func (a *StateAPI) stateForTs(ts *types.TipSet) (*state.StateTree, error) { +func (a *StateAPI) stateForTs(ctx context.Context, ts *types.TipSet) (*state.StateTree, error) { if ts == nil { ts = a.Chain.GetHeaviestTipSet() } - st, _, err := a.StateManager.TipSetState(ts) + st, _, err := a.StateManager.TipSetState(ctx, ts) if err != nil { return nil, err } @@ -145,7 +145,7 @@ func (a *StateAPI) stateForTs(ts *types.TipSet) (*state.StateTree, error) { } func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) { - state, err := a.stateForTs(ts) + state, err := a.stateForTs(ctx, ts) if err != nil { return nil, err } @@ -154,7 +154,7 @@ func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, ts } func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) { - state, err := a.stateForTs(ts) + state, err := a.stateForTs(ctx, ts) if err != nil { return nil, err } diff --git a/node/modules/core.go b/node/modules/core.go index 547cf0fe1..bade468bf 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -1,15 +1,20 @@ package modules import ( + "context" "crypto/rand" - "io" - "io/ioutil" - + "github.com/filecoin-project/go-lotus/build" + "github.com/filecoin-project/go-lotus/node/modules/helpers" "github.com/gbrlsnchs/jwt/v3" logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peerstore" record "github.com/libp2p/go-libp2p-record" + "go.uber.org/fx" "golang.org/x/xerrors" + "io" + "io/ioutil" + "time" "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/chain/types" @@ -70,3 +75,46 @@ func APISecret(keystore types.KeyStore, lr repo.LockedRepo) (*dtypes.APIAlg, err return (*dtypes.APIAlg)(jwt.NewHS256(key.PrivateKey)), nil } + +func Bootstrap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) { + ctx, cancel := context.WithCancel(mctx) + + lc.Append(fx.Hook{ + OnStart: func(_ context.Context) error { + go func() { + for { + sctx, cancel := context.WithTimeout(ctx, build.BlockDelay*time.Second/2) + <-sctx.Done() + cancel() + + if ctx.Err() != nil { + return + } + + if len(host.Network().Conns()) > 0 { + continue + } + + log.Warn("No peers connected, performing automatic bootstrap") + + pis, err := build.BuiltinBootstrap() + if err != nil { + log.Error("Getting bootstrap addrs: ", err) + return + } + + for _, pi := range pis { + if err := host.Connect(ctx, pi); err != nil { + log.Warn("bootstrap connect failed: ", err) + } + } + } + }() + return nil + }, + OnStop: func(_ context.Context) error { + cancel() + return nil + }, + }) +} diff --git a/node/modules/lp2p/pnet.go b/node/modules/lp2p/pnet.go index ffae463f8..3fe8e89df 100644 --- a/node/modules/lp2p/pnet.go +++ b/node/modules/lp2p/pnet.go @@ -8,12 +8,12 @@ import ( pnet "github.com/libp2p/go-libp2p-pnet" ) -var lotusKey = "/key/swarm/psk/1.0.0/\n/base16/\n20c72398e6299c7bbc1b501fdcc8abe4f89f798e9b93b2d2bc02e3c29b6a088e" +var LotusKey = "/key/swarm/psk/1.0.0/\n/base16/\n20c72398e6299c7bbc1b501fdcc8abe4f89f798e9b93b2d2bc02e3c29b6a088e" type PNetFingerprint []byte func PNet() (opts Libp2pOpts, fp PNetFingerprint, err error) { - protec, err := pnet.NewProtector(strings.NewReader(lotusKey)) + protec, err := pnet.NewProtector(strings.NewReader(LotusKey)) if err != nil { return opts, nil, fmt.Errorf("failed to configure private network: %s", err) } diff --git a/scripts/daemon.service b/scripts/daemon.service index 2055e4371..8d93518b5 100644 --- a/scripts/daemon.service +++ b/scripts/daemon.service @@ -4,6 +4,8 @@ After=network.target [Service] ExecStart=/usr/local/bin/lotus daemon +Environment=GOLOG_FILE="/root/.lotus/logs" +Environment=GOLOG_LOG_FMT="json" [Install] -WantedBy=multiuser.target \ No newline at end of file +WantedBy=multiuser.target diff --git a/scripts/filebeat.yml b/scripts/filebeat.yml new file mode 100644 index 000000000..0a027b655 --- /dev/null +++ b/scripts/filebeat.yml @@ -0,0 +1,66 @@ +############################# Filebeat ##################################### + +filebeat.inputs: + +- type: log + paths: + - /root/.lotusstorage/logs + fields: + logzio_codec: json + token: + type: lotus-miner + fields_under_root: true + json.keys_under_root: false + encoding: utf-8 + ignore_older: 3h +- type: log + paths: + - /root/.lotus/logs + fields: + logzio_codec: json + token: + type: lotus-daemon + fields_under_root: true + json.keys_under_root: false + encoding: utf-8 + ignore_older: 3h + +#For version 6.x and lower +#filebeat.registry_file: /var/lib/filebeat/registry + +#For version 7 and higher +filebeat.registry.path: /var/lib/filebeat + +#The following processors are to ensure compatibility with version 7 +processors: +- rename: + fields: + - from: "agent" + to: "beat_agent" + ignore_missing: true +- rename: + fields: + - from: "log.file.path" + to: "source" + ignore_missing: true + +- if: + has_fields: ['json.ts'] + then: + - timestamp: + field: 'json.ts' + layouts: + - '2006-01-02T15:04:05.000Z0700' + test: + - '2019-10-10T22:37:48.297+0200' + - drop_fields: + fields: ['json.ts'] + + +############################# Output ########################################## + +output: + logstash: + hosts: ["listener.logz.io:5015"] + ssl: + certificate_authorities: ['/etc/pki/tls/certs/COMODORSADomainValidationSecureServerCA.crt'] diff --git a/scripts/sminer.service b/scripts/sminer.service index c5453ef06..ed966a5d2 100644 --- a/scripts/sminer.service +++ b/scripts/sminer.service @@ -4,6 +4,8 @@ After=network.target [Service] ExecStart=/usr/local/bin/lotus-storage-miner run +Environment=GOLOG_FILE="/root/.lotusstorage/logs" +Environment=GOLOG_LOG_FMT="json" [Install] WantedBy=multiuser.target diff --git a/storage/post.go b/storage/post.go index 8f4e9686c..0b49ff509 100644 --- a/storage/post.go +++ b/storage/post.go @@ -2,7 +2,7 @@ package storage import ( "context" - "encoding/base64" + "time" "golang.org/x/xerrors" @@ -67,14 +67,14 @@ func (m *Miner) scheduleNextPost(ppe uint64) { headPPE, provingPeriod := actors.ProvingPeriodEnd(ppe, ts.Height()) if headPPE > ppe { - log.Warn("PoSt computation running behind chain") + log.Warnw("PoSt computation running behind chain", "headPPE", headPPE, "ppe", ppe) ppe = headPPE } m.schedLk.Lock() if m.schedPost >= ppe { // this probably can't happen - log.Error("PoSt already scheduled: %d >= %d", m.schedPost, ppe) + log.Errorw("PoSt already scheduled", "schedPost", m.schedPost, "ppe", ppe) m.schedLk.Unlock() return } @@ -82,7 +82,8 @@ func (m *Miner) scheduleNextPost(ppe uint64) { m.schedPost = ppe m.schedLk.Unlock() - log.Infof("Scheduling post at height %d (head=%d; ppe=%d, period=%d)", ppe-build.PoSTChallangeTime, ts.Height(), ppe, provingPeriod) + log.Infow("scheduling PoSt", "post-height", ppe-build.PoSTChallangeTime, + "height", ts.Height(), "ppe", ppe, "proving-period", provingPeriod) err = m.events.ChainAt(m.computePost(ppe), func(ts *types.TipSet) error { // Revert // TODO: Cancel post log.Errorf("TODO: Cancel PoSt, re-run") @@ -90,7 +91,7 @@ func (m *Miner) scheduleNextPost(ppe uint64) { }, PoStConfidence, ppe-build.PoSTChallangeTime) if err != nil { // TODO: This is BAD, figure something out - log.Errorf("scheduling PoSt failed: %s", err) + log.Errorf("scheduling PoSt failed: %+v", err) return } } @@ -110,14 +111,19 @@ func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) erro return xerrors.Errorf("failed to get chain randomness for post (ts=%d; ppe=%d): %w", ts.Height(), ppe, err) } - log.Infof("running PoSt computation, rh=%d r=%s, ppe=%d, h=%d", int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime), base64.StdEncoding.EncodeToString(r), ppe, ts.Height()) + log.Infow("running PoSt", "delayed-by", + int64(ts.Height())-(int64(ppe)-int64(build.PoSTChallangeTime)), + "chain-random", r, "ppe", ppe, "height", ts.Height()) + + tsStart := time.Now() var faults []uint64 proof, err := m.secst.RunPoSt(ctx, sset, r, faults) if err != nil { return xerrors.Errorf("running post failed: %w", err) } + elapsed := time.Since(tsStart) - log.Infof("submitting PoSt pLen=%d", len(proof)) + log.Infow("submitting PoSt", "pLen", len(proof), "elapsed", elapsed) params := &actors.SubmitPoStParams{ Proof: proof,