Merge remote-tracking branch 'upstream/master'

This commit is contained in:
eshon 2019-10-11 14:00:47 +02:00
commit 504b359ba5
37 changed files with 697 additions and 77 deletions

View File

@ -54,12 +54,14 @@ type FullNode interface {
ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error // TODO: check serialization
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
ChainGetTipSet(context.Context, []cid.Cid) (*types.TipSet, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error)
ChainGetParentReceipts(context.Context, cid.Cid) ([]*types.MessageReceipt, error)
ChainGetParentMessages(context.Context, cid.Cid) ([]Message, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
ChainSetHead(context.Context, *types.TipSet) error
ChainGetGenesis(context.Context) (*types.TipSet, error)
// syncer
SyncState(context.Context) (*SyncState, error)

View File

@ -42,12 +42,14 @@ type FullNodeStruct struct {
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"`
ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"`
ChainGetTipSet func(context.Context, []cid.Cid) (*types.TipSet, error) `perm:"read"`
ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error) `perm:"read"`
ChainGetParentReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"`
ChainGetParentMessages func(context.Context, cid.Cid) ([]Message, error) `perm:"read"`
ChainGetTipSetByHeight func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) `perm:"read"`
ChainReadObj func(context.Context, cid.Cid) ([]byte, error) `perm:"read"`
ChainSetHead func(context.Context, *types.TipSet) error `perm:"admin"`
ChainGetGenesis func(context.Context) (*types.TipSet, error) `perm:"read"`
SyncState func(context.Context) (*SyncState, error) `perm:"read"`
@ -284,6 +286,10 @@ func (c *FullNodeStruct) ChainGetBlock(ctx context.Context, b cid.Cid) (*types.B
return c.Internal.ChainGetBlock(ctx, b)
}
func (c *FullNodeStruct) ChainGetTipSet(ctx context.Context, cids []cid.Cid) (*types.TipSet, error) {
return c.Internal.ChainGetTipSet(ctx, cids)
}
func (c *FullNodeStruct) ChainGetBlockMessages(ctx context.Context, b cid.Cid) (*BlockMessages, error) {
return c.Internal.ChainGetBlockMessages(ctx, b)
}
@ -308,6 +314,10 @@ func (c *FullNodeStruct) ChainSetHead(ctx context.Context, ts *types.TipSet) err
return c.Internal.ChainSetHead(ctx, ts)
}
func (c *FullNodeStruct) ChainGetGenesis(ctx context.Context) (*types.TipSet, error) {
return c.Internal.ChainGetGenesis(ctx)
}
func (c *FullNodeStruct) SyncState(ctx context.Context) (*SyncState, error) {
return c.Internal.SyncState(ctx)
}

View File

@ -20,7 +20,7 @@ import (
)
func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w *wallet.Wallet, miner address.Address, parents *types.TipSet, tickets []*types.Ticket, proof types.ElectionProof, msgs []*types.SignedMessage, timestamp uint64) (*types.FullBlock, error) {
st, recpts, err := sm.TipSetState(parents)
st, recpts, err := sm.TipSetState(ctx, parents)
if err != nil {
return nil, errors.Wrap(err, "failed to load tipset state")
}

112
chain/metrics/consensus.go Normal file
View File

@ -0,0 +1,112 @@
package metrics
import (
"context"
"encoding/json"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/node/impl/full"
"github.com/filecoin-project/go-lotus/node/modules/helpers"
)
var log = logging.Logger("metrics")
const baseTopic = "/fil/headnotifs/"
type Update struct {
Type string
}
func SendHeadNotifs(nickname string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error {
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
gen, err := chain.Chain.GetGenesis()
if err != nil {
return err
}
topic := baseTopic + gen.Cid().String()
go func() {
if err := sendHeadNotifs(ctx, ps, topic, chain, nickname); err != nil {
log.Error("consensus metrics error", err)
return
}
}()
go func() {
sub, err := ps.Subscribe(topic)
if err != nil {
return
}
defer sub.Cancel()
for {
if _, err := sub.Next(ctx); err != nil {
return
}
}
}()
return nil
},
})
return nil
}
}
type message struct {
// TipSet
Cids []cid.Cid
Blocks []*types.BlockHeader
Height uint64
Weight types.BigInt
// Meta
NodeName string
}
func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain full.ChainAPI, nickname string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
notifs, err := chain.ChainNotify(ctx)
if err != nil {
return err
}
for {
select {
case notif := <-notifs:
n := notif[len(notif)-1]
m := message{
Cids: n.Val.Cids(),
Blocks: n.Val.Blocks(),
Height: n.Val.Height(),
Weight: n.Val.Weight(),
NodeName: nickname,
}
b, err := json.Marshal(m)
if err != nil {
return err
}
if err := ps.Publish(topic, b); err != nil {
return err
}
case <-ctx.Done():
return nil
}
}
}

View File

@ -19,6 +19,7 @@ import (
"github.com/ipfs/go-cid"
hamt "github.com/ipfs/go-hamt-ipld"
logging "github.com/ipfs/go-log"
"go.opencensus.io/trace"
)
var log = logging.Logger("statemgr")
@ -45,8 +46,7 @@ func cidsToKey(cids []cid.Cid) string {
return out
}
func (sm *StateManager) TipSetState(ts *types.TipSet) (cid.Cid, cid.Cid, error) {
ctx := context.TODO()
func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) {
ck := cidsToKey(ts.Cids())
sm.stlk.Lock()
@ -76,6 +76,9 @@ func (sm *StateManager) TipSetState(ts *types.TipSet) (cid.Cid, cid.Cid, error)
}
func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.BlockHeader, cb func(cid.Cid, *types.Message, *vm.ApplyRet) error) (cid.Cid, cid.Cid, error) {
ctx, span := trace.StartSpan(ctx, "computeTipSetState")
defer span.End()
pstate := blks[0].ParentStateRoot
cids := make([]cid.Cid, len(blks))
@ -248,7 +251,7 @@ func (sm *StateManager) ResolveToKeyAddress(ctx context.Context, addr address.Ad
ts = sm.cs.GetHeaviestTipSet()
}
st, _, err := sm.TipSetState(ts)
st, _, err := sm.TipSetState(ctx, ts)
if err != nil {
return address.Undef, xerrors.Errorf("resolve address failed to get tipset state: %w", err)
}

View File

@ -45,6 +45,7 @@ type ChainStore struct {
tstLk sync.Mutex
tipsets map[uint64][]cid.Cid
reorgCh chan<- reorg
headChangeNotifs []func(rev, app []*types.TipSet) error
}
@ -56,6 +57,8 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
tipsets: make(map[uint64][]cid.Cid),
}
cs.reorgCh = cs.reorgWorker(context.TODO())
hcnf := func(rev, app []*types.TipSet) error {
cs.pubLk.Lock()
defer cs.pubLk.Unlock()
@ -217,17 +220,46 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error {
return nil
}
func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
if cs.heaviest != nil {
revert, apply, err := cs.ReorgOps(cs.heaviest, ts)
if err != nil {
return errors.Wrap(err, "computing reorg ops failed")
}
for _, hcf := range cs.headChangeNotifs {
if err := hcf(revert, apply); err != nil {
return errors.Wrap(err, "head change func errored (BAD)")
type reorg struct {
old *types.TipSet
new *types.TipSet
}
func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg {
out := make(chan reorg, 32)
go func() {
defer log.Warn("reorgWorker quit")
for {
select {
case r := <-out:
revert, apply, err := cs.ReorgOps(r.old, r.new)
if err != nil {
log.Error("computing reorg ops failed: ", err)
continue
}
for _, hcf := range cs.headChangeNotifs {
if err := hcf(revert, apply); err != nil {
log.Error("head change func errored (BAD): ", err)
}
}
case <-ctx.Done():
return
}
}
}()
return out
}
func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
if cs.heaviest != nil { // buf
if len(cs.reorgCh) > 0 {
log.Warnf("Reorg channel running behind, %d reorgs buffered", len(cs.reorgCh))
}
cs.reorgCh <- reorg{
old: cs.heaviest,
new: ts,
}
} else {
log.Warn("no heaviest tipset found, using %s", ts.Cids())
}
@ -693,7 +725,7 @@ func (cs *ChainStore) GetRandomness(ctx context.Context, blks []cid.Cid, tickets
}
lt := int64(len(tickets))
if lb < lt {
log.Desugar().Warn("self sampling randomness. this should be extremely rare, if you see this often it may be a bug", zap.Stack("call-stack"))
log.Desugar().Warn("self sampling randomness. this should be extremely rare, if you see this often it may be a bug", zap.Stack("stacktrace"))
t := tickets[lt-(1+lb)]

View File

@ -44,7 +44,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
return
}
log.Infof("inform new block over pubsub: %s from %s", blk.Header.Cid(), msg.GetFrom())
log.Infow("new block over pubsub", "cid", blk.Header.Cid(), "source", msg.GetFrom())
s.InformNewBlock(msg.GetFrom(), &types.FullBlock{
Header: blk.Header,
BlsMessages: bmsgs,

View File

@ -24,6 +24,7 @@ import (
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
)
@ -87,6 +88,7 @@ const BootstrapPeerThreshold = 1
// This should be called when connecting to new peers, and additionally
// when receiving new blocks from the network
func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
ctx := context.Background()
if fts == nil {
panic("bad")
}
@ -102,7 +104,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
// TODO: this is kindof a hack...
log.Info("got block from ourselves")
if err := syncer.Sync(fts.TipSet()); err != nil {
if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
log.Errorf("failed to sync our own block %s: %+v", fts.TipSet().Cids(), err)
}
@ -114,7 +116,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
syncer.Bsync.AddPeer(from)
go func() {
if err := syncer.Sync(fts.TipSet()); err != nil {
if err := syncer.Sync(ctx, fts.TipSet()); err != nil {
log.Errorf("sync error: %+v", err)
}
}()
@ -327,9 +329,10 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
return fts, nil
}
func (syncer *Syncer) Sync(maybeHead *types.TipSet) error {
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "chain.Sync")
defer span.End()
ctx := context.TODO()
syncer.syncLock.Lock()
defer syncer.syncLock.Unlock()
@ -420,6 +423,7 @@ func (syncer *Syncer) validateTickets(ctx context.Context, mworker address.Addre
// Should match up with 'Semantical Validation' in validation.md in the spec
func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) error {
h := b.Header
baseTs, err := syncer.store.LoadTipSet(h.Parents)
@ -427,7 +431,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
return xerrors.Errorf("load parent tipset failed (%s): %w", h.Parents, err)
}
stateroot, precp, err := syncer.sm.TipSetState(baseTs)
stateroot, precp, err := syncer.sm.TipSetState(ctx, baseTs)
if err != nil {
return xerrors.Errorf("get tipsetstate(%d, %s) failed: %w", h.Height, h.Parents, err)
}
@ -594,7 +598,7 @@ func (syncer *Syncer) verifyBlsAggregate(sig types.Signature, msgs []cid.Cid, pu
var bsig bls.Signature
copy(bsig[:], sig.Data)
if !bls.Verify(bsig, digests, pubks) {
if !bls.Verify(&bsig, digests, pubks) {
return xerrors.New("bls aggregate signature failed to verify")
}
@ -720,11 +724,11 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type
return nil, xerrors.Errorf("fork was longer than our threshold")
}
func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {
syncer.syncState.SetHeight(0)
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids()))
if err := syncer.ValidateTipSet(context.TODO(), fts); err != nil {
if err := syncer.ValidateTipSet(ctx, fts); err != nil {
log.Errorf("failed to validate tipset: %+v", err)
return xerrors.Errorf("message processing failed: %w", err)
}
@ -847,11 +851,12 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
syncer.syncState.SetStage(api.StageMessages)
if err := syncer.syncMessagesAndCheckState(headers); err != nil {
if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil {
return xerrors.Errorf("collectChain syncMessages: %w", err)
}
syncer.syncState.SetStage(api.StageSyncComplete)
log.Infow("new tipset", "height", ts.Height(), "tipset", types.LogCids(ts.Cids()))
return nil
}

17
chain/types/logs.go Normal file
View File

@ -0,0 +1,17 @@
package types
import (
"github.com/ipfs/go-cid"
"go.uber.org/zap/zapcore"
)
type LogCids []cid.Cid
var _ zapcore.ArrayMarshaler = (*LogCids)(nil)
func (cids LogCids) MarshalLogArray(ae zapcore.ArrayEncoder) error {
for _, c := range cids {
ae.AppendString(c.String())
}
return nil
}

View File

@ -44,7 +44,7 @@ func (s *Signature) Verify(addr address.Address, msg []byte) error {
var sig bls.Signature
copy(sig[:], s.Data)
if !bls.Verify(sig, digests, pubkeys) {
if !bls.Verify(&sig, digests, pubkeys) {
return fmt.Errorf("bls signature failed to verify")
}

View File

@ -430,6 +430,13 @@ func checkMessage(msg *types.Message) error {
func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
ctx, span := trace.StartSpan(ctx, "vm.ApplyMessage")
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(
trace.StringAttribute("to", msg.To.String()),
trace.Int64Attribute("method", int64(msg.Method)),
trace.StringAttribute("value", msg.Value.String()),
)
}
if err := checkMessage(msg); err != nil {
return nil, err
@ -605,6 +612,7 @@ func (vm *VM) SetBlockHeight(h uint64) {
func (vm *VM) Invoke(act *types.Actor, vmctx *VMContext, method uint64, params []byte) ([]byte, aerrors.ActorError) {
ctx, span := trace.StartSpan(vmctx.ctx, "vm.Invoke")
defer span.End()
var oldCtx context.Context
oldCtx, vmctx.ctx = vmctx.ctx, ctx
defer func() {

View File

@ -22,6 +22,7 @@ var chainCmd = &cli.Command{
chainReadObjCmd,
chainGetMsgCmd,
chainSetHeadCmd,
chainListCmd,
},
}
@ -210,6 +211,12 @@ var chainGetMsgCmd = &cli.Command{
var chainSetHeadCmd = &cli.Command{
Name: "sethead",
Usage: "manually set the local nodes head tipset (Caution: normally only used for recovery)",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "genesis",
Usage: "reset head to genesis",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
@ -218,13 +225,25 @@ var chainSetHeadCmd = &cli.Command{
defer closer()
ctx := ReqContext(cctx)
if !cctx.Args().Present() {
gen := cctx.Bool("genesis")
if !cctx.Args().Present() && !gen {
return fmt.Errorf("must pass cids for tipset to set as head")
}
ts, err := parseTipSet(api, ctx, cctx.Args().Slice())
if err != nil {
return err
var ts *types.TipSet
if gen {
gents, err := api.ChainGetGenesis(ctx)
if err != nil {
return err
}
ts = gents
} else {
parsedts, err := parseTipSet(api, ctx, cctx.Args().Slice())
if err != nil {
return err
}
ts = parsedts
}
if err := api.ChainSetHead(ctx, ts); err != nil {
@ -253,3 +272,46 @@ func parseTipSet(api api.FullNode, ctx context.Context, vals []string) (*types.T
return types.NewTipSet(headers)
}
var chainListCmd = &cli.Command{
Name: "list",
Usage: "View a segment of the chain",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
head, err := api.ChainHead(ctx)
if err != nil {
return err
}
tss := []*types.TipSet{head}
cur := head
for i := 1; i < 30; i++ {
if cur.Height() == 0 {
break
}
next, err := api.ChainGetTipSet(ctx, cur.Parents())
if err != nil {
return err
}
tss = append(tss, next)
cur = next
}
for i := len(tss) - 1; i >= 0; i-- {
fmt.Printf("%d [ ", tss[i].Height())
for _, b := range tss[i].Blocks() {
fmt.Printf("%s: %s,", b.Cid(), b.Miner)
}
fmt.Println("]")
}
return nil
},
}

108
cmd/lotus-townhall/main.go Normal file
View File

@ -0,0 +1,108 @@
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/gorilla/websocket"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/peer"
pnet "github.com/libp2p/go-libp2p-pnet"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/filecoin-project/go-lotus/lib/addrutil"
"github.com/filecoin-project/go-lotus/node/modules/lp2p"
)
const topic = "/fil/headnotifs/bafy2bzacedjqrkfbuafakygo6vlkrqozvsju2d5k6g24ry3mjjfxwrvet2636"
var upgrader = websocket.Upgrader{
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func main() {
ctx := context.Background()
protec, err := pnet.NewProtector(strings.NewReader(lp2p.LotusKey))
if err != nil {
panic(err)
}
host, err := libp2p.New(
ctx,
libp2p.Defaults,
libp2p.PrivateNetwork(protec),
)
if err != nil {
panic(err)
}
ps, err := pubsub.NewGossipSub(ctx, host)
if err != nil {
panic(err)
}
pi, err := addrutil.ParseAddresses(ctx, []string{
"/ip4/147.75.80.29/tcp/1347/p2p/12D3KooWAShT7qd3oM7QPC8AsQffs6ThH69fZGui4xCW68E35rDP",
})
if err != nil {
panic(err)
}
if err := host.Connect(ctx, pi[0]); err != nil {
panic(err)
}
http.HandleFunc("/sub", handler(ps))
fmt.Println("listening")
if err := http.ListenAndServe("0.0.0.0:2975", nil); err != nil {
panic(err)
}
}
type update struct {
From peer.ID
Update json.RawMessage
}
func handler(ps *pubsub.PubSub) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
if r.Header.Get("Sec-WebSocket-Protocol") != "" {
w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol"))
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
sub, err := ps.Subscribe(topic)
if err != nil {
return
}
for {
msg, err := sub.Next(r.Context())
if err != nil {
return
}
fmt.Println(msg)
if err := conn.WriteJSON(update{
From: peer.ID(msg.From),
Update: msg.Data,
}); err != nil {
return
}
}
}
}

23
cmd/lotus-townhall/townhall/.gitignore vendored Normal file
View File

@ -0,0 +1,23 @@
# See https://help.github.com/articles/ignoring-files/ for more about ignoring files.
# dependencies
/node_modules
/.pnp
.pnp.js
# testing
/coverage
# production
/build
# misc
.DS_Store
.env.local
.env.development.local
.env.test.local
.env.production.local
npm-debug.log*
yarn-debug.log*
yarn-error.log*

View File

@ -0,0 +1,31 @@
{
"name": "townhall",
"version": "0.1.0",
"private": true,
"dependencies": {
"react": "^16.10.2",
"react-dom": "^16.10.2",
"react-scripts": "3.2.0"
},
"scripts": {
"start": "react-scripts start",
"build": "react-scripts build",
"test": "react-scripts test",
"eject": "react-scripts eject"
},
"eslintConfig": {
"extends": "react-app"
},
"browserslist": {
"production": [
">0.2%",
"not dead",
"not op_mini all"
],
"development": [
"last 1 chrome version",
"last 1 firefox version",
"last 1 safari version"
]
}
}

View File

@ -0,0 +1,13 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<meta name="theme-color" content="#1a1a1a" />
<title>Lotus TownHall</title>
</head>
<body>
<noscript>You need to enable JavaScript to run this app.</noscript>
<div id="root"></div>
</body>
</html>

View File

@ -0,0 +1,2 @@
# https://www.robotstxt.org/robotstxt.html
User-agent: *

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,40 @@
import React from 'react';
import './App.css';
class App extends React.Component {
constructor(props) {
super(props);
//let ws = new WebSocket("ws://" + window.location.host + "/sub")
let ws = new WebSocket("ws://127.0.0.1:2975/sub")
ws.onmessage = (ev) => {
console.log(ev)
let update = JSON.parse(ev.data)
this.setState( prev => ({
...prev, [update.From]: update.Update,
}))
}
this.state = {}
}
render() {
let best = Object.keys(this.state).map(k => this.state[k]).reduce((p, n) => p > n.Height ? p : n.Height, -1)
console.log(best)
return <table>{Object.keys(this.state).map(k => [k, this.state[k]]).map(([k, v]) => {
let l = [<td>{k}</td>, <td>{v.NodeName}</td>, <td>{v.Height}</td>]
if (best !== v.Height) {
l = <tr style={{color: '#f00'}}>{l}</tr>
} else {
l = <tr>{l}</tr>
}
return l
})
}</table>
}
}
export default App;

View File

@ -0,0 +1,9 @@
import React from 'react';
import ReactDOM from 'react-dom';
import App from './App';
it('renders without crashing', () => {
const div = document.createElement('div');
ReactDOM.render(<App />, div);
ReactDOM.unmountComponentAtNode(div);
});

View File

@ -0,0 +1,6 @@
body {
margin: 0;
font-family: monospace;
background: #1f1f1f;
color: #f0f0f0;
}

View File

@ -0,0 +1,6 @@
import React from 'react';
import ReactDOM from 'react-dom';
import './index.css';
import App from './App';
ReactDOM.render(<App />, document.getElementById('root'));

View File

@ -94,37 +94,16 @@ var DaemonCmd = &cli.Command{
}
return lr.SetAPIEndpoint(apima)
}),
node.ApplyIf(func(s *node.Settings) bool { return cctx.Bool("bootstrap") },
node.Override(node.BootstrapKey, modules.Bootstrap),
),
)
if err != nil {
return err
}
go func() {
if !cctx.Bool("bootstrap") {
return
}
err := bootstrap(ctx, api)
if err != nil {
log.Error("Bootstrap failed: ", err)
}
}()
// TODO: properly parse api endpoint (or make it a URL)
return serveRPC(api, stop, "127.0.0.1:"+cctx.String("api"))
},
}
func bootstrap(ctx context.Context, api api.FullNode) error {
pis, err := build.BuiltinBootstrap()
if err != nil {
return err
}
for _, pi := range pis {
if err := api.NetConnect(ctx, pi); err != nil {
return err
}
}
return nil
}

2
extern/go-bls-sigs vendored

@ -1 +1 @@
Subproject commit 03705e06e83ac0d4c98695dacd0f20a350cc93d7
Subproject commit c221eb016ab7074465f444fb592c5184e2df3926

2
go.mod
View File

@ -34,7 +34,7 @@ require (
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipld-cbor v0.0.3
github.com/ipfs/go-ipld-format v0.0.2
github.com/ipfs/go-log v0.0.2-0.20190905183954-62f287c7db59
github.com/ipfs/go-log v0.0.2-0.20190920042044-a609c1ae5144
github.com/ipfs/go-merkledag v0.2.3
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52

4
go.sum
View File

@ -254,8 +254,8 @@ github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dC
github.com/ipfs/go-ipld-format v0.0.2 h1:OVAGlyYT6JPZ0pEfGntFPS40lfrDmaDbQwNHEY2G9Zs=
github.com/ipfs/go-ipld-format v0.0.2/go.mod h1:4B6+FM2u9OJ9zCV+kSbgFAZlOrv1Hqbf0INGQgiKf9k=
github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM=
github.com/ipfs/go-log v0.0.2-0.20190905183954-62f287c7db59 h1:gUmFTK1r79usWs8LQJsoMw+yba2qs/EVj9kmPRvSrYM=
github.com/ipfs/go-log v0.0.2-0.20190905183954-62f287c7db59/go.mod h1:azGN5dH7ailfREknDDNYB0Eq4qZ/4I4Y3gO0ivjJNyM=
github.com/ipfs/go-log v0.0.2-0.20190920042044-a609c1ae5144 h1:5WM8S1nwquWQ3zEuNhK82NE5Di6Pd41qz9JxxvxTAIA=
github.com/ipfs/go-log v0.0.2-0.20190920042044-a609c1ae5144/go.mod h1:azGN5dH7ailfREknDDNYB0Eq4qZ/4I4Y3gO0ivjJNyM=
github.com/ipfs/go-merkledag v0.1.0/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
github.com/ipfs/go-merkledag v0.2.3 h1:aMdkK9G1hEeNvn3VXfiEMLY0iJnbiQQUHnM0HFJREsE=
github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=

View File

@ -219,7 +219,7 @@ func (m *Miner) GetBestMiningCandidate() (*MiningBase, error) {
}
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) {
log.Info("attempting to mine a block on:", base.ts.Cids())
log.Infow("attempting to mine a block", "tipset", types.LogCids(base.ts.Cids()))
ticket, err := m.scratchTicket(ctx, base)
if err != nil {
return nil, errors.Wrap(err, "scratching ticket failed")
@ -239,7 +239,7 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
if err != nil {
return nil, errors.Wrap(err, "failed to create block")
}
log.Infof("mined new block: %s", b.Cid())
log.Infow("mined new block", "cid", b.Cid())
return b, nil
}

View File

@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/deals"
"github.com/filecoin-project/go-lotus/chain/metrics"
"github.com/filecoin-project/go-lotus/chain/stmgr"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
@ -70,6 +71,7 @@ const (
PstoreAddSelfKeysKey = invoke(iota)
StartListeningKey
BootstrapKey
// filecoin
SetGenesisKey
@ -90,6 +92,7 @@ const (
// daemon
ExtractApiKey
HeadMetricsKey
SetApiEndpointKey
@ -225,6 +228,7 @@ func Online() Option {
Override(RunHelloKey, modules.RunHello),
Override(RunBlockSyncKey, modules.RunBlockSync),
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
Override(HeadMetricsKey, metrics.SendHeadNotifs("")),
Override(new(*discovery.Local), discovery.NewLocal),
Override(new(discovery.PeerResolver), modules.RetrievalResolver),
@ -289,6 +293,10 @@ func Config(cfg *config.Root) Option {
ApplyIf(func(s *Settings) bool { return s.Online },
Override(StartListeningKey, lp2p.StartListening(cfg.Libp2p.ListenAddresses)),
ApplyIf(func(s *Settings) bool { return s.nodeType == nodeFull },
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
),
),
)
}

View File

@ -6,6 +6,8 @@ import "time"
type Root struct {
API API
Libp2p Libp2p
Metrics Metrics
}
// API contains configs for API endpoint
@ -19,6 +21,10 @@ type Libp2p struct {
ListenAddresses []string
}
type Metrics struct {
Nickname string
}
// Default returns the default config
func Default() *Root {
def := Root{

View File

@ -50,6 +50,10 @@ func (a *ChainAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.Block
return a.Chain.GetBlock(msg)
}
func (a *ChainAPI) ChainGetTipSet(ctx context.Context, cids []cid.Cid) (*types.TipSet, error) {
return a.Chain.LoadTipSet(cids)
}
func (a *ChainAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) {
b, err := a.Chain.GetBlock(msg)
if err != nil {
@ -161,3 +165,12 @@ func (a *ChainAPI) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error
func (a *ChainAPI) ChainSetHead(ctx context.Context, ts *types.TipSet) error {
return a.Chain.SetHead(ts)
}
func (a *ChainAPI) ChainGetGenesis(ctx context.Context) (*types.TipSet, error) {
genb, err := a.Chain.GetGenesis()
if err != nil {
return nil, err
}
return types.NewTipSet([]*types.BlockHeader{genb})
}

View File

@ -129,12 +129,12 @@ func (a *StateAPI) StateReplay(ctx context.Context, ts *types.TipSet, mc cid.Cid
}, nil
}
func (a *StateAPI) stateForTs(ts *types.TipSet) (*state.StateTree, error) {
func (a *StateAPI) stateForTs(ctx context.Context, ts *types.TipSet) (*state.StateTree, error) {
if ts == nil {
ts = a.Chain.GetHeaviestTipSet()
}
st, _, err := a.StateManager.TipSetState(ts)
st, _, err := a.StateManager.TipSetState(ctx, ts)
if err != nil {
return nil, err
}
@ -145,7 +145,7 @@ func (a *StateAPI) stateForTs(ts *types.TipSet) (*state.StateTree, error) {
}
func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) {
state, err := a.stateForTs(ts)
state, err := a.stateForTs(ctx, ts)
if err != nil {
return nil, err
}
@ -154,7 +154,7 @@ func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, ts
}
func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) {
state, err := a.stateForTs(ts)
state, err := a.stateForTs(ctx, ts)
if err != nil {
return nil, err
}

View File

@ -1,15 +1,20 @@
package modules
import (
"context"
"crypto/rand"
"io"
"io/ioutil"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/node/modules/helpers"
"github.com/gbrlsnchs/jwt/v3"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peerstore"
record "github.com/libp2p/go-libp2p-record"
"go.uber.org/fx"
"golang.org/x/xerrors"
"io"
"io/ioutil"
"time"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/types"
@ -70,3 +75,46 @@ func APISecret(keystore types.KeyStore, lr repo.LockedRepo) (*dtypes.APIAlg, err
return (*dtypes.APIAlg)(jwt.NewHS256(key.PrivateKey)), nil
}
func Bootstrap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) {
ctx, cancel := context.WithCancel(mctx)
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
go func() {
for {
sctx, cancel := context.WithTimeout(ctx, build.BlockDelay*time.Second/2)
<-sctx.Done()
cancel()
if ctx.Err() != nil {
return
}
if len(host.Network().Conns()) > 0 {
continue
}
log.Warn("No peers connected, performing automatic bootstrap")
pis, err := build.BuiltinBootstrap()
if err != nil {
log.Error("Getting bootstrap addrs: ", err)
return
}
for _, pi := range pis {
if err := host.Connect(ctx, pi); err != nil {
log.Warn("bootstrap connect failed: ", err)
}
}
}
}()
return nil
},
OnStop: func(_ context.Context) error {
cancel()
return nil
},
})
}

View File

@ -8,12 +8,12 @@ import (
pnet "github.com/libp2p/go-libp2p-pnet"
)
var lotusKey = "/key/swarm/psk/1.0.0/\n/base16/\n20c72398e6299c7bbc1b501fdcc8abe4f89f798e9b93b2d2bc02e3c29b6a088e"
var LotusKey = "/key/swarm/psk/1.0.0/\n/base16/\n20c72398e6299c7bbc1b501fdcc8abe4f89f798e9b93b2d2bc02e3c29b6a088e"
type PNetFingerprint []byte
func PNet() (opts Libp2pOpts, fp PNetFingerprint, err error) {
protec, err := pnet.NewProtector(strings.NewReader(lotusKey))
protec, err := pnet.NewProtector(strings.NewReader(LotusKey))
if err != nil {
return opts, nil, fmt.Errorf("failed to configure private network: %s", err)
}

View File

@ -4,6 +4,8 @@ After=network.target
[Service]
ExecStart=/usr/local/bin/lotus daemon
Environment=GOLOG_FILE="/root/.lotus/logs"
Environment=GOLOG_LOG_FMT="json"
[Install]
WantedBy=multiuser.target
WantedBy=multiuser.target

66
scripts/filebeat.yml Normal file
View File

@ -0,0 +1,66 @@
############################# Filebeat #####################################
filebeat.inputs:
- type: log
paths:
- /root/.lotusstorage/logs
fields:
logzio_codec: json
token: <API TOKEN>
type: lotus-miner
fields_under_root: true
json.keys_under_root: false
encoding: utf-8
ignore_older: 3h
- type: log
paths:
- /root/.lotus/logs
fields:
logzio_codec: json
token: <API TOKEN>
type: lotus-daemon
fields_under_root: true
json.keys_under_root: false
encoding: utf-8
ignore_older: 3h
#For version 6.x and lower
#filebeat.registry_file: /var/lib/filebeat/registry
#For version 7 and higher
filebeat.registry.path: /var/lib/filebeat
#The following processors are to ensure compatibility with version 7
processors:
- rename:
fields:
- from: "agent"
to: "beat_agent"
ignore_missing: true
- rename:
fields:
- from: "log.file.path"
to: "source"
ignore_missing: true
- if:
has_fields: ['json.ts']
then:
- timestamp:
field: 'json.ts'
layouts:
- '2006-01-02T15:04:05.000Z0700'
test:
- '2019-10-10T22:37:48.297+0200'
- drop_fields:
fields: ['json.ts']
############################# Output ##########################################
output:
logstash:
hosts: ["listener.logz.io:5015"]
ssl:
certificate_authorities: ['/etc/pki/tls/certs/COMODORSADomainValidationSecureServerCA.crt']

View File

@ -4,6 +4,8 @@ After=network.target
[Service]
ExecStart=/usr/local/bin/lotus-storage-miner run
Environment=GOLOG_FILE="/root/.lotusstorage/logs"
Environment=GOLOG_LOG_FMT="json"
[Install]
WantedBy=multiuser.target

View File

@ -2,7 +2,7 @@ package storage
import (
"context"
"encoding/base64"
"time"
"golang.org/x/xerrors"
@ -67,14 +67,14 @@ func (m *Miner) scheduleNextPost(ppe uint64) {
headPPE, provingPeriod := actors.ProvingPeriodEnd(ppe, ts.Height())
if headPPE > ppe {
log.Warn("PoSt computation running behind chain")
log.Warnw("PoSt computation running behind chain", "headPPE", headPPE, "ppe", ppe)
ppe = headPPE
}
m.schedLk.Lock()
if m.schedPost >= ppe {
// this probably can't happen
log.Error("PoSt already scheduled: %d >= %d", m.schedPost, ppe)
log.Errorw("PoSt already scheduled", "schedPost", m.schedPost, "ppe", ppe)
m.schedLk.Unlock()
return
}
@ -82,7 +82,8 @@ func (m *Miner) scheduleNextPost(ppe uint64) {
m.schedPost = ppe
m.schedLk.Unlock()
log.Infof("Scheduling post at height %d (head=%d; ppe=%d, period=%d)", ppe-build.PoSTChallangeTime, ts.Height(), ppe, provingPeriod)
log.Infow("scheduling PoSt", "post-height", ppe-build.PoSTChallangeTime,
"height", ts.Height(), "ppe", ppe, "proving-period", provingPeriod)
err = m.events.ChainAt(m.computePost(ppe), func(ts *types.TipSet) error { // Revert
// TODO: Cancel post
log.Errorf("TODO: Cancel PoSt, re-run")
@ -90,7 +91,7 @@ func (m *Miner) scheduleNextPost(ppe uint64) {
}, PoStConfidence, ppe-build.PoSTChallangeTime)
if err != nil {
// TODO: This is BAD, figure something out
log.Errorf("scheduling PoSt failed: %s", err)
log.Errorf("scheduling PoSt failed: %+v", err)
return
}
}
@ -110,14 +111,19 @@ func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) erro
return xerrors.Errorf("failed to get chain randomness for post (ts=%d; ppe=%d): %w", ts.Height(), ppe, err)
}
log.Infof("running PoSt computation, rh=%d r=%s, ppe=%d, h=%d", int64(ts.Height())-int64(ppe)+int64(build.PoSTChallangeTime), base64.StdEncoding.EncodeToString(r), ppe, ts.Height())
log.Infow("running PoSt", "delayed-by",
int64(ts.Height())-(int64(ppe)-int64(build.PoSTChallangeTime)),
"chain-random", r, "ppe", ppe, "height", ts.Height())
tsStart := time.Now()
var faults []uint64
proof, err := m.secst.RunPoSt(ctx, sset, r, faults)
if err != nil {
return xerrors.Errorf("running post failed: %w", err)
}
elapsed := time.Since(tsStart)
log.Infof("submitting PoSt pLen=%d", len(proof))
log.Infow("submitting PoSt", "pLen", len(proof), "elapsed", elapsed)
params := &actors.SubmitPoStParams{
Proof: proof,