From 9a536e7cc0e52d6b5e0b01a352bccabf5468ab2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 11 Oct 2019 05:16:12 +0200 Subject: [PATCH 01/23] config: Allow overriding bootstrap peers --- node/builder.go | 6 ++++++ node/config/def.go | 1 + node/modules/core.go | 21 +++++++++++++-------- node/modules/dtypes/bootstrap.go | 5 +++++ 4 files changed, 25 insertions(+), 8 deletions(-) create mode 100644 node/modules/dtypes/bootstrap.go diff --git a/node/builder.go b/node/builder.go index 54e75c6c7..33a11e1c1 100644 --- a/node/builder.go +++ b/node/builder.go @@ -203,6 +203,8 @@ func Online() Option { ApplyIf(func(s *Settings) bool { return s.nodeType == nodeFull }, // TODO: Fix offline mode + Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap), + Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages), Override(new(*store.ChainStore), modules.ChainStore), @@ -294,6 +296,10 @@ func Config(cfg *config.Root) Option { ApplyIf(func(s *Settings) bool { return s.Online }, Override(StartListeningKey, lp2p.StartListening(cfg.Libp2p.ListenAddresses)), + ApplyIf(func(s *Settings) bool { return len(cfg.Libp2p.BootstrapPeers) > 0 }, + Override(new(dtypes.BootstrapPeers), modules.ConfigBootstrap(cfg.Libp2p.BootstrapPeers)), + ), + ApplyIf(func(s *Settings) bool { return s.nodeType == nodeFull }, Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)), ), diff --git a/node/config/def.go b/node/config/def.go index 11480a593..3b1020840 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -19,6 +19,7 @@ type API struct { // Libp2p contains configs for libp2p type Libp2p struct { ListenAddresses []string + BootstrapPeers []string } type Metrics struct { diff --git a/node/modules/core.go b/node/modules/core.go index bade468bf..8aaa52393 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "github.com/filecoin-project/go-lotus/build" + "github.com/filecoin-project/go-lotus/lib/addrutil" "github.com/filecoin-project/go-lotus/node/modules/helpers" "github.com/gbrlsnchs/jwt/v3" logging "github.com/ipfs/go-log" @@ -76,7 +77,17 @@ func APISecret(keystore types.KeyStore, lr repo.LockedRepo) (*dtypes.APIAlg, err return (*dtypes.APIAlg)(jwt.NewHS256(key.PrivateKey)), nil } -func Bootstrap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) { +func ConfigBootstrap(peers []string) func() (dtypes.BootstrapPeers, error) { + return func() (dtypes.BootstrapPeers, error) { + return addrutil.ParseAddresses(context.TODO(), peers) + } +} + +func BuiltinBootstrap() (dtypes.BootstrapPeers, error) { + return build.BuiltinBootstrap() +} + +func Bootstrap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, pinfos dtypes.BootstrapPeers) { ctx, cancel := context.WithCancel(mctx) lc.Append(fx.Hook{ @@ -97,13 +108,7 @@ func Bootstrap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) { log.Warn("No peers connected, performing automatic bootstrap") - pis, err := build.BuiltinBootstrap() - if err != nil { - log.Error("Getting bootstrap addrs: ", err) - return - } - - for _, pi := range pis { + for _, pi := range pinfos { if err := host.Connect(ctx, pi); err != nil { log.Warn("bootstrap connect failed: ", err) } diff --git a/node/modules/dtypes/bootstrap.go b/node/modules/dtypes/bootstrap.go new file mode 100644 index 000000000..0b46f63c3 --- /dev/null +++ b/node/modules/dtypes/bootstrap.go @@ -0,0 +1,5 @@ +package dtypes + +import "github.com/libp2p/go-libp2p-core/peer" + +type BootstrapPeers []peer.AddrInfo From 20cf5ba54c24c7867acb6305fd4399a6ea26aa17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 11 Oct 2019 08:11:57 +0200 Subject: [PATCH 02/23] townhall: Single-binary build --- .gitignore | 1 + Makefile | 7 +++++++ cmd/lotus-townhall/main.go | 4 +++- cmd/lotus-townhall/townhall/src/App.js | 4 ++-- 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 4e008cccc..9b0a66bcd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ /lotus /lotus-storage-miner /pond +/townhall /lotuspond/front/node_modules /lotuspond/front/build **/*.h diff --git a/Makefile b/Makefile index ce08f30ef..441bc03fc 100644 --- a/Makefile +++ b/Makefile @@ -88,6 +88,13 @@ pond: build (cd lotuspond/front && npm i && npm run build) .PHONY: pond +townhall: + rm -f townhall + go build -o townhall ./cmd/lotus-townhall + (cd ./cmd/lotus-townhall/townhall && npm i && npm run build) + go run github.com/GeertJohan/go.rice/rice append --exec townhall -i ./cmd/lotus-townhall +.PHONY: townhall + clean: rm -rf $(CLEAN) -$(MAKE) -C $(BLS_PATH) clean diff --git a/cmd/lotus-townhall/main.go b/cmd/lotus-townhall/main.go index 64b7f423c..b7534d046 100644 --- a/cmd/lotus-townhall/main.go +++ b/cmd/lotus-townhall/main.go @@ -7,6 +7,7 @@ import ( "net/http" "strings" + rice "github.com/GeertJohan/go.rice" "github.com/gorilla/websocket" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/peer" @@ -59,8 +60,9 @@ func main() { } http.HandleFunc("/sub", handler(ps)) + http.Handle("/", http.FileServer(rice.MustFindBox("townhall/build").HTTPBox())) - fmt.Println("listening") + fmt.Println("listening on http://localhost:2975") if err := http.ListenAndServe("0.0.0.0:2975", nil); err != nil { panic(err) diff --git a/cmd/lotus-townhall/townhall/src/App.js b/cmd/lotus-townhall/townhall/src/App.js index 426588d60..5b5ca5bca 100644 --- a/cmd/lotus-townhall/townhall/src/App.js +++ b/cmd/lotus-townhall/townhall/src/App.js @@ -5,8 +5,8 @@ class App extends React.Component { constructor(props) { super(props); - //let ws = new WebSocket("ws://" + window.location.host + "/sub") - let ws = new WebSocket("ws://127.0.0.1:2975/sub") + let ws = new WebSocket("ws://" + window.location.host + "/sub") + //let ws = new WebSocket("ws://127.0.0.1:2975/sub") ws.onmessage = (ev) => { console.log(ev) From a56e786e36b0c8af3ef47b4cd1b29243e3b1c32c Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Fri, 11 Oct 2019 18:13:04 +0900 Subject: [PATCH 03/23] some cleanup of vm transfer logic to make juan happier --- chain/stmgr/stmgr.go | 3 +-- chain/vm/vm.go | 38 +++++++++++++++++++++++++++----------- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 9a70cbd0a..5077b3dad 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -110,11 +110,10 @@ func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.Bl return cid.Undef, cid.Undef, xerrors.Errorf("failed to get miner owner actor") } - if err := vm.DeductFunds(netact, reward); err != nil { + if err := vm.Transfer(netact, act, reward); err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("failed to deduct funds from network actor: %w", err) } - vm.DepositFunds(act, reward) } // TODO: can't use method from chainstore because it doesnt let us know who the block miners were diff --git a/chain/vm/vm.go b/chain/vm/vm.go index 3498c9f69..1c4fbb158 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -396,10 +396,9 @@ func (vm *VM) send(ctx context.Context, msg *types.Message, parent *VMContext, return nil, aerrors.Wrap(aerr, "sending funds"), nil } - if err := DeductFunds(fromActor, msg.Value); err != nil { - return nil, aerrors.Absorb(err, 1, "failed to deduct funds"), nil + if err := Transfer(fromActor, toActor, msg.Value); err != nil { + return nil, aerrors.Absorb(err, 1, "failed to transfer funds"), nil } - DepositFunds(toActor, msg.Value) } if msg.Method != 0 { @@ -463,8 +462,10 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*ApplyRet, if fromActor.Balance.LessThan(totalCost) { return nil, xerrors.Errorf("not enough funds (%s < %s)", fromActor.Balance, totalCost) } - if err := DeductFunds(fromActor, gascost); err != nil { - return nil, xerrors.Errorf("failed to deduct funds: %w", err) + + gasHolder := &types.Actor{Balance: types.NewInt(0)} + if err := Transfer(fromActor, gasHolder, gascost); err != nil { + return nil, xerrors.Errorf("failed to withdraw gas funds: %w", err) } if msg.Nonce != fromActor.Nonce { @@ -494,7 +495,9 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*ApplyRet, // refund unused gas gasUsed = vmctx.GasUsed() refund := types.BigMul(types.BigSub(msg.GasLimit, gasUsed), msg.GasPrice) - DepositFunds(fromActor, refund) + if err := Transfer(gasHolder, fromActor, refund); err != nil { + return nil, xerrors.Errorf("failed to refund gas") + } } miner, err := st.GetActor(vm.blockMiner) @@ -503,7 +506,13 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*ApplyRet, } gasReward := types.BigMul(msg.GasPrice, gasUsed) - DepositFunds(miner, gasReward) + if err := Transfer(gasHolder, miner, gasReward); err != nil { + return nil, xerrors.Errorf("failed to give miner gas reward: %w", err) + } + + if types.BigCmp(types.NewInt(0), gasHolder.Balance) != 0 { + return nil, xerrors.Errorf("gas handling math is wrong") + } return &ApplyRet{ MessageReceipt: types.MessageReceipt{ @@ -597,10 +606,9 @@ func (vm *VM) TransferFunds(from, to address.Address, amt types.BigInt) error { return err } - if err := DeductFunds(fromAct, amt); err != nil { + if err := Transfer(fromAct, toAct, amt); err != nil { return xerrors.Errorf("failed to deduct funds: %w", err) } - DepositFunds(toAct, amt) return nil } @@ -628,7 +636,15 @@ func (vm *VM) Invoke(act *types.Actor, vmctx *VMContext, method uint64, params [ return ret, nil } -func DeductFunds(act *types.Actor, amt types.BigInt) error { +func Transfer(from, to *types.Actor, amt types.BigInt) error { + if err := deductFunds(from, amt); err != nil { + return err + } + depositFunds(to, amt) + return nil +} + +func deductFunds(act *types.Actor, amt types.BigInt) error { if act.Balance.LessThan(amt) { return fmt.Errorf("not enough funds") } @@ -637,7 +653,7 @@ func DeductFunds(act *types.Actor, amt types.BigInt) error { return nil } -func DepositFunds(act *types.Actor, amt types.BigInt) { +func depositFunds(act *types.Actor, amt types.BigInt) { act.Balance = types.BigAdd(act.Balance, amt) } From edd7085712146e1934f16077047d53180e6fb3e0 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Fri, 11 Oct 2019 18:02:21 +0200 Subject: [PATCH 04/23] Update filebeat Change log of rpc buffer as I want to set up alert when it goes to high License: MIT Signed-off-by: Jakub Sztandera --- lib/jsonrpc/client.go | 2 +- scripts/filebeat.yml | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/lib/jsonrpc/client.go b/lib/jsonrpc/client.go index 4530bfd34..7218d087e 100644 --- a/lib/jsonrpc/client.go +++ b/lib/jsonrpc/client.go @@ -163,7 +163,7 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int) buf.PushBack(val) if buf.Len() > 1 { - log.Warnf("rpc output channel has %d buffered messages", buf.Len()) + log.Warnw("rpc output message buffer", "n", buf.Len()) bufLk.Unlock() return } diff --git a/scripts/filebeat.yml b/scripts/filebeat.yml index 0a027b655..38c9adbc7 100644 --- a/scripts/filebeat.yml +++ b/scripts/filebeat.yml @@ -11,6 +11,8 @@ filebeat.inputs: type: lotus-miner fields_under_root: true json.keys_under_root: false + json.message_key: msg + encoding: utf-8 ignore_older: 3h - type: log @@ -22,6 +24,7 @@ filebeat.inputs: type: lotus-daemon fields_under_root: true json.keys_under_root: false + json.message_key: msg encoding: utf-8 ignore_older: 3h @@ -55,6 +58,13 @@ processors: - '2019-10-10T22:37:48.297+0200' - drop_fields: fields: ['json.ts'] +- if: + has_fields: ['json.msg'] + then: + - rename: + fields: + - from: 'json.msg' + to: 'message' ############################# Output ########################################## From 3481048431f48f72269651e5711f24406f4be307 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 12 Oct 2019 01:13:07 +0200 Subject: [PATCH 05/23] fix events deadlock when chaining calls --- chain/events/events_height.go | 3 ++ chain/events/events_test.go | 68 +++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/chain/events/events_height.go b/chain/events/events_height.go index 0458cc4de..eded0d0c2 100644 --- a/chain/events/events_height.go +++ b/chain/events/events_height.go @@ -132,9 +132,12 @@ func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence log.Warnf("events.ChainAt: calling HandleFunc with nil tipset, not found in cache: %s", err) } + e.lk.Unlock() if err := hnd(ts, bestH); err != nil { return err } + e.lk.Lock() + bestH = e.tsc.best().Height() } if bestH >= h+uint64(confidence)+e.gcConfidence { diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 7706d8f47..a74417f0e 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -364,6 +364,74 @@ func TestAtStartConfidence(t *testing.T) { require.Equal(t, false, reverted) } +func TestAtChained(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + var applied bool + var reverted bool + + err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { + return events.ChainAt(func(ts *types.TipSet, curH uint64) error { + require.Equal(t, 10, int(ts.Height())) + applied = true + return nil + }, func(ts *types.TipSet) error { + reverted = true + return nil + }, 3, 10) + }, func(ts *types.TipSet) error { + reverted = true + return nil + }, 3, 5) + require.NoError(t, err) + + fcs.advance(0, 15, nil) + + require.Equal(t, true, applied) + require.Equal(t, false, reverted) +} + +func TestAtChainedConfidence(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + fcs.advance(0, 15, nil) + + var applied bool + var reverted bool + + err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { + return events.ChainAt(func(ts *types.TipSet, curH uint64) error { + require.Equal(t, 10, int(ts.Height())) + applied = true + return nil + }, func(ts *types.TipSet) error { + reverted = true + return nil + }, 3, 10) + }, func(ts *types.TipSet) error { + reverted = true + return nil + }, 3, 5) + require.NoError(t, err) + + require.Equal(t, true, applied) + require.Equal(t, false, reverted) +} + func TestCalled(t *testing.T) { fcs := &fakeCS{ t: t, From 5861332f409ee4914a1b65eec021862dfe15edee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 12 Oct 2019 01:47:29 +0200 Subject: [PATCH 06/23] sminer: info command --- cmd/lotus-storage-miner/info.go | 34 +++++++++++++++++++++++++++++++++ cmd/lotus-storage-miner/main.go | 1 + 2 files changed, 35 insertions(+) create mode 100644 cmd/lotus-storage-miner/info.go diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go new file mode 100644 index 000000000..702af2c2b --- /dev/null +++ b/cmd/lotus-storage-miner/info.go @@ -0,0 +1,34 @@ +package main + +import ( + "fmt" + + "gopkg.in/urfave/cli.v2" + + lcli "github.com/filecoin-project/go-lotus/cli" +) + +var infoCmd = &cli.Command{ + Name: "info", + Usage: "Print storage miner info", + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + aaddr, err := nodeApi.ActorAddresses(ctx) + if err != nil { + return err + } + + fmt.Printf("actor address: %s\n", aaddr) + // TODO: grab actr state / info + // * Sector size + // * Sealed sectors (count / bytes) + // * Power + return nil + }, +} diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go index 131a4b42d..b7360f7e8 100644 --- a/cmd/lotus-storage-miner/main.go +++ b/cmd/lotus-storage-miner/main.go @@ -21,6 +21,7 @@ func main() { local := []*cli.Command{ runCmd, initCmd, + infoCmd, storeGarbageCmd, sectorsCmd, } From edcf47ff3149f0f57558145ddcf00088f82f4821 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sat, 12 Oct 2019 15:45:48 +0900 Subject: [PATCH 07/23] implement commands to list actors and miners --- api/api.go | 2 ++ api/struct.go | 9 +++++ chain/actors/actor_storagemarket.go | 22 ++++++++++++ chain/stmgr/stmgr.go | 31 +++++++++++++++++ cli/state.go | 52 +++++++++++++++++++++++++++++ node/impl/full/state.go | 19 +++++++++++ 6 files changed, 135 insertions(+) diff --git a/api/api.go b/api/api.go index 1dff49e13..cb1c72bef 100644 --- a/api/api.go +++ b/api/api.go @@ -129,6 +129,8 @@ type FullNode interface { StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) StatePledgeCollateral(context.Context, *types.TipSet) (types.BigInt, error) StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error) + StateListMiners(context.Context, *types.TipSet) ([]address.Address, error) + StateListActors(context.Context, *types.TipSet) ([]address.Address, error) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) PaychList(context.Context) ([]address.Address, error) diff --git a/api/struct.go b/api/struct.go index 1a6068dc4..b1df124b2 100644 --- a/api/struct.go +++ b/api/struct.go @@ -95,6 +95,8 @@ type FullNodeStruct struct { StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"` StatePledgeCollateral func(context.Context, *types.TipSet) (types.BigInt, error) `perm:"read"` StateWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"` + StateListMiners func(context.Context, *types.TipSet) ([]address.Address, error) `perm:"read"` + StateListActors func(context.Context, *types.TipSet) ([]address.Address, error) `perm:"read"` PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) `perm:"sign"` PaychList func(context.Context) ([]address.Address, error) `perm:"read"` @@ -368,6 +370,13 @@ func (c *FullNodeStruct) StatePledgeCollateral(ctx context.Context, ts *types.Ti func (c *FullNodeStruct) StateWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) { return c.Internal.StateWaitMsg(ctx, msgc) } +func (c *FullNodeStruct) StateListMiners(ctx context.Context, ts *types.TipSet) ([]address.Address, error) { + return c.Internal.StateListMiners(ctx, ts) +} + +func (c *FullNodeStruct) StateListActors(ctx context.Context, ts *types.TipSet) ([]address.Address, error) { + return c.Internal.StateListActors(ctx, ts) +} func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) { return c.Internal.PaychGet(ctx, from, to, ensureFunds) diff --git a/chain/actors/actor_storagemarket.go b/chain/actors/actor_storagemarket.go index f4d3d3d58..e595be949 100644 --- a/chain/actors/actor_storagemarket.go +++ b/chain/actors/actor_storagemarket.go @@ -450,6 +450,28 @@ func MinerSetHas(ctx context.Context, vmctx types.VMContext, rcid cid.Cid, maddr } } +func MinerSetList(ctx context.Context, cst *hamt.CborIpldStore, rcid cid.Cid) ([]address.Address, error) { + nd, err := hamt.LoadNode(ctx, cst, rcid) + if err != nil { + return nil, xerrors.Errorf("failed to load miner set: %w", err) + } + + var out []address.Address + err = nd.ForEach(ctx, func(k string, val interface{}) error { + addr, err := address.NewFromBytes([]byte(k)) + if err != nil { + return err + } + out = append(out, addr) + return nil + }) + if err != nil { + return nil, err + } + + return out, nil +} + func MinerSetAdd(ctx context.Context, vmctx types.VMContext, rcid cid.Cid, maddr address.Address) (cid.Cid, aerrors.ActorError) { nd, err := hamt.LoadNode(ctx, vmctx.Ipld(), rcid) if err != nil { diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 9a70cbd0a..1c8f52ed8 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -426,3 +426,34 @@ func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid) (*t return nil, nil } + +func (sm *StateManager) ListAllActors(ctx context.Context, ts *types.TipSet) ([]address.Address, error) { + if ts == nil { + ts = sm.ChainStore().GetHeaviestTipSet() + } + st, _, err := sm.TipSetState(ctx, ts) + if err != nil { + return nil, err + } + + cst := hamt.CSTFromBstore(sm.ChainStore().Blockstore()) + r, err := hamt.LoadNode(ctx, cst, st) + if err != nil { + return nil, err + } + + var out []address.Address + err = r.ForEach(ctx, func(k string, val interface{}) error { + addr, err := address.NewFromBytes([]byte(k)) + if err != nil { + return xerrors.Errorf("address in state tree was not valid: %w", err) + } + out = append(out, addr) + return nil + }) + if err != nil { + return nil, err + } + + return out, nil +} diff --git a/cli/state.go b/cli/state.go index 7984694f4..5152b1215 100644 --- a/cli/state.go +++ b/cli/state.go @@ -18,6 +18,8 @@ var stateCmd = &cli.Command{ stateSectorsCmd, stateProvingSetCmd, statePledgeCollateralCmd, + stateListActorsCmd, + stateListMinersCmd, }, } @@ -199,3 +201,53 @@ var statePledgeCollateralCmd = &cli.Command{ return nil }, } + +var stateListMinersCmd = &cli.Command{ + Name: "list-miners", + Usage: "list all miners in the network", + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := ReqContext(cctx) + + miners, err := api.StateListMiners(ctx, nil) + if err != nil { + return err + } + + for _, m := range miners { + fmt.Println(m.String()) + } + + return nil + }, +} + +var stateListActorsCmd = &cli.Command{ + Name: "list-actors", + Usage: "list all actors in the network", + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := ReqContext(cctx) + + actors, err := api.StateListActors(ctx, nil) + if err != nil { + return err + } + + for _, a := range actors { + fmt.Println(a.String()) + } + + return nil + }, +} diff --git a/node/impl/full/state.go b/node/impl/full/state.go index efb946e3d..b8dd07742 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -207,3 +207,22 @@ func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, TipSet: ts, }, nil } + +func (a *StateAPI) StateListMiners(ctx context.Context, ts *types.TipSet) ([]address.Address, error) { + var state actors.StorageMarketState + if _, err := a.StateManager.LoadActorState(ctx, actors.StorageMarketAddress, &state, ts); err != nil { + return nil, err + } + + cst := hamt.CSTFromBstore(a.StateManager.ChainStore().Blockstore()) + miners, err := actors.MinerSetList(ctx, cst, state.Miners) + if err != nil { + return nil, err + } + + return miners, nil +} + +func (a *StateAPI) StateListActors(ctx context.Context, ts *types.TipSet) ([]address.Address, error) { + return a.StateManager.ListAllActors(ctx, ts) +} From 80d306de1441a029e9b909a3518d43dc5b963865 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sat, 12 Oct 2019 15:57:49 +0900 Subject: [PATCH 08/23] check transfer amounts are positive values --- chain/vm/vm.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/chain/vm/vm.go b/chain/vm/vm.go index 1c4fbb158..cf9a52f4d 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -637,6 +637,10 @@ func (vm *VM) Invoke(act *types.Actor, vmctx *VMContext, method uint64, params [ } func Transfer(from, to *types.Actor, amt types.BigInt) error { + if amt.LessThan(types.NewInt(0)) { + return xerrors.Errorf("attempted to transfer negative value") + } + if err := deductFunds(from, amt); err != nil { return err } From 67d6437fa1de303f857fa3e64dd02b202d891530 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 12 Oct 2019 09:21:28 +0200 Subject: [PATCH 09/23] client: query ask total price calc --- cli/client.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/cli/client.go b/cli/client.go index 8b7eda72c..23dcc43cd 100644 --- a/cli/client.go +++ b/cli/client.go @@ -241,6 +241,14 @@ var clientQueryAskCmd = &cli.Command{ Name: "peerid", Usage: "specify peer ID of node to make query against", }, + &cli.Int64Flag{ + Name: "size", + Usage: "data size in bytes", + }, + &cli.Int64Flag{ + Name: "duration", + Usage: "deal duration", + }, }, Action: func(cctx *cli.Context) error { if cctx.NArg() != 1 { @@ -295,7 +303,20 @@ var clientQueryAskCmd = &cli.Command{ } fmt.Printf("Ask: %s\n", maddr) - fmt.Printf("Price: %s\n", ask.Ask.Price) + fmt.Printf("Price per Byte: %s\n", ask.Ask.Price) + + size := cctx.Int64("size") + if size == 0 { + return nil + } + fmt.Printf("Price per Block: %s\n", types.BigMul(ask.Ask.Price, types.NewInt(uint64(size)))) + + duration := cctx.Int64("duration") + if duration == 0 { + return nil + } + fmt.Printf("Total Price: %s\n", types.BigMul(types.BigMul(ask.Ask.Price, types.NewInt(uint64(size))), types.NewInt(uint64(duration)))) + return nil }, } From 30198b4105f84494d7eaf63fdce59c81dc67fe23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 12 Oct 2019 16:05:41 +0200 Subject: [PATCH 10/23] townhall: show tipset propagation --- cmd/lotus-townhall/townhall/src/App.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-townhall/townhall/src/App.js b/cmd/lotus-townhall/townhall/src/App.js index 5b5ca5bca..058e9e32b 100644 --- a/cmd/lotus-townhall/townhall/src/App.js +++ b/cmd/lotus-townhall/townhall/src/App.js @@ -25,7 +25,9 @@ class App extends React.Component { console.log(best) return {Object.keys(this.state).map(k => [k, this.state[k]]).map(([k, v]) => { - let l = [, , ] + + let mnrs = v.Blocks.map(b =>  m:{b.Miner}) + let l = [, , , ] if (best !== v.Height) { l = {l} } else { From 084d68a6b345be3522fefeeef9a5bd9471049fc3 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Sun, 13 Oct 2019 01:10:24 +0200 Subject: [PATCH 11/23] Add height and count flags to chain list License: MIT Signed-off-by: Jakub Sztandera --- cli/chain.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/cli/chain.go b/cli/chain.go index e16fadef7..6627337b6 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -276,6 +276,10 @@ func parseTipSet(api api.FullNode, ctx context.Context, vals []string) (*types.T var chainListCmd = &cli.Command{ Name: "list", Usage: "View a segment of the chain", + Flags: []cli.Flag{ + &cli.Uint64Flag{Name: "height"}, + &cli.UintFlag{Name: "count", Value: 30}, + }, Action: func(cctx *cli.Context) error { api, closer, err := GetFullNodeAPI(cctx) if err != nil { @@ -289,9 +293,22 @@ var chainListCmd = &cli.Command{ return err } - tss := []*types.TipSet{head} cur := head - for i := 1; i < 30; i++ { + + for cctx.IsSet("height") && cur.Height() > cctx.Uint64("height") { + if cur.Height() == 0 { + break + } + var err error + cur, err = api.ChainGetTipSet(ctx, cur.Parents()) + if err != nil { + return err + } + } + + tss := []*types.TipSet{cur} + + for i := uint(1); i < cctx.Uint("count"); i++ { if cur.Height() == 0 { break } From e11deb955162fe86356e371dd5437a141a43f628 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Sun, 13 Oct 2019 01:18:32 +0200 Subject: [PATCH 12/23] Add bughunt Error log License: MIT Signed-off-by: Jakub Sztandera --- storage/post.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/storage/post.go b/storage/post.go index 0b49ff509..ea6521210 100644 --- a/storage/post.go +++ b/storage/post.go @@ -97,7 +97,13 @@ func (m *Miner) scheduleNextPost(ppe uint64) { } func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) error { + called := 0 return func(ts *types.TipSet, curH uint64) error { + if called > 0 { + log.Errorw("BUG: computePost callback called again", "ppe", ppe, + "height", ts.Height(), "curH", curH, "called", called) + } + called++ ctx := context.TODO() From 531e9d0bebeb3dec46a6233e7cc5d4b91f321a76 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Sun, 13 Oct 2019 02:55:45 +0200 Subject: [PATCH 13/23] Cleanup chain list by height License: MIT Signed-off-by: Jakub Sztandera --- cli/chain.go | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/cli/chain.go b/cli/chain.go index 6627337b6..3782d42d5 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -288,38 +288,36 @@ var chainListCmd = &cli.Command{ defer closer() ctx := ReqContext(cctx) - head, err := api.ChainHead(ctx) + var head *types.TipSet + + if cctx.IsSet("height") { + head, err = api.ChainGetTipSetByHeight(ctx, cctx.Uint64("height"), nil) + } else { + head, err = api.ChainHead(ctx) + } if err != nil { return err } - cur := head - - for cctx.IsSet("height") && cur.Height() > cctx.Uint64("height") { - if cur.Height() == 0 { - break - } - var err error - cur, err = api.ChainGetTipSet(ctx, cur.Parents()) - if err != nil { - return err - } + count := cctx.Uint("count") + if count < 1 { + return nil } - tss := []*types.TipSet{cur} + tss := make([]*types.TipSet, count) + tss[0] = head - for i := uint(1); i < cctx.Uint("count"); i++ { - if cur.Height() == 0 { + for i := 1; i < len(tss); i++ { + if head.Height() == 0 { break } - next, err := api.ChainGetTipSet(ctx, cur.Parents()) + head, err = api.ChainGetTipSet(ctx, head.Parents()) if err != nil { return err } - tss = append(tss, next) - cur = next + tss[i] = head } for i := len(tss) - 1; i >= 0; i-- { From 45737f8a5178bb389d028258bdc843733a06507d Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sat, 12 Oct 2019 18:44:56 +0900 Subject: [PATCH 14/23] add more tracing spans License: MIT Signed-off-by: Jakub Sztandera --- chain/blocksync.go | 30 ++++++++++++++++++++++++++++-- chain/stmgr/stmgr.go | 5 +++++ chain/sync.go | 32 ++++++++++++++++++++++++++------ chain/vm/vm.go | 3 +++ 4 files changed, 62 insertions(+), 8 deletions(-) diff --git a/chain/blocksync.go b/chain/blocksync.go index 82e91afe9..b2e65a9c2 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -10,6 +10,7 @@ import ( bserv "github.com/ipfs/go-blockservice" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/protocol" + "go.opencensus.io/trace" "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/chain/store" @@ -86,6 +87,9 @@ func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService { } func (bss *BlockSyncService) HandleStream(s inet.Stream) { + ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream") + defer span.End() + defer s.Close() var req BlockSyncRequest @@ -95,7 +99,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) { } log.Infof("block sync request for: %s %d", req.Start, req.RequestLength) - resp, err := bss.processRequest(&req) + resp, err := bss.processRequest(ctx, &req) if err != nil { log.Error("failed to process block sync request: ", err) return @@ -107,7 +111,10 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) { } } -func (bss *BlockSyncService) processRequest(req *BlockSyncRequest) (*BlockSyncResponse, error) { +func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncRequest) (*BlockSyncResponse, error) { + ctx, span := trace.StartSpan(ctx, "blocksync.ProcessRequest") + defer span.End() + opts := ParseBSOptions(req.Options) if len(req.Start) == 0 { return &BlockSyncResponse{ @@ -116,6 +123,13 @@ func (bss *BlockSyncService) processRequest(req *BlockSyncRequest) (*BlockSyncRe }, nil } + if span.IsRecordingEvents() { + span.AddAttributes( + trace.BoolAttribute("blocks", opts.IncludeBlocks), + trace.BoolAttribute("messages", opts.IncludeMessages), + ) + } + chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts) if err != nil { log.Error("encountered error while responding to block sync request: ", err) @@ -253,6 +267,15 @@ func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse } func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) { + ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks") + defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes( + trace.StringAttribute("tipset", fmt.Sprint(tipset)), + trace.Int64Attribute("count", int64(count)), + ) + } + peers := bs.getPeers() perm := rand.Perm(len(peers)) // TODO: round robin through these peers on error @@ -321,6 +344,9 @@ func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid) } func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, count uint64) ([]*BSTipSet, error) { + ctx, span := trace.StartSpan(ctx, "GetChainMessages") + defer span.End() + peers := bs.getPeers() perm := rand.Perm(len(peers)) // TODO: round robin through these peers on error diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 870c32c1b..86a09ecb0 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -47,12 +47,17 @@ func cidsToKey(cids []cid.Cid) string { } func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) { + ctx, span := trace.StartSpan(ctx, "tipSetState") + defer span.End() ck := cidsToKey(ts.Cids()) sm.stlk.Lock() cached, ok := sm.stCache[ck] sm.stlk.Unlock() if ok { + if span.IsRecordingEvents() { + span.AddAttributes(trace.BoolAttribute("cache", true)) + } return cached[0], cached[1], nil } diff --git a/chain/sync.go b/chain/sync.go index 4ff9ad54f..7f1db0d27 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -352,6 +352,9 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { } func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet) error { + ctx, span := trace.StartSpan(ctx, "validateTipSet") + defer span.End() + ts := fts.TipSet() if ts.Equals(syncer.Genesis) { return nil @@ -423,6 +426,8 @@ func (syncer *Syncer) validateTickets(ctx context.Context, mworker address.Addre // Should match up with 'Semantical Validation' in validation.md in the spec func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) error { + ctx, span := trace.StartSpan(ctx, "validateBlock") + defer span.End() h := b.Header @@ -606,6 +611,15 @@ func (syncer *Syncer) verifyBlsAggregate(sig types.Signature, msgs []cid.Cid, pu } func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { + ctx, span := trace.StartSpan(ctx, "collectHeaders") + defer span.End() + if span.IsRecordingEvents() { + span.AddAttributes( + trace.Int64Attribute("fromHeight", int64(from.Height())), + trace.Int64Attribute("toHeight", int64(to.Height())), + ) + } + blockSet := []*types.TipSet{from} at := from.Parents() @@ -647,7 +661,7 @@ loop: if gap := int(blockSet[len(blockSet)-1].Height() - untilHeight); gap < window { window = gap } - blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, window) + blks, err := syncer.Bsync.GetBlocks(ctx, at, window) if err != nil { // Most likely our peers aren't fully synced yet, but forwarded // new block message (ideally we'd find better peers) @@ -726,7 +740,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error { syncer.syncState.SetHeight(0) - return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error { + return syncer.iterFullTipsets(ctx, headers, func(ctx context.Context, fts *store.FullTipSet) error { log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids())) if err := syncer.ValidateTipSet(ctx, fts); err != nil { log.Errorf("failed to validate tipset: %+v", err) @@ -740,7 +754,10 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []* } // fills out each of the given tipsets with messages and calls the callback with it -func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.FullTipSet) error) error { +func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipSet, cb func(context.Context, *store.FullTipSet) error) error { + ctx, span := trace.StartSpan(ctx, "iterFullTipsets") + defer span.End() + beg := len(headers) - 1 // handle case where we have a prefix of these locally for ; beg >= 0; beg-- { @@ -751,7 +768,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu if fts == nil { break } - if err := cb(fts); err != nil { + if err := cb(ctx, fts); err != nil { return err } } @@ -767,7 +784,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu } next := headers[i-batchSize] - bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, uint64(batchSize+1)) + bstips, err := syncer.Bsync.GetChainMessages(ctx, next, uint64(batchSize+1)) if err != nil { return xerrors.Errorf("message processing failed: %w", err) } @@ -788,7 +805,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu return xerrors.Errorf("message processing failed: %w", err) } - if err := cb(fts); err != nil { + if err := cb(ctx, fts); err != nil { return err } @@ -828,6 +845,9 @@ func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error { } func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error { + ctx, span := trace.StartSpan(ctx, "collectChain") + defer span.End() + syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts) headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet()) diff --git a/chain/vm/vm.go b/chain/vm/vm.go index cf9a52f4d..89da57c65 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -538,6 +538,9 @@ func (vm *VM) ActorBalance(addr address.Address) (types.BigInt, aerrors.ActorErr } func (vm *VM) Flush(ctx context.Context) (cid.Cid, error) { + ctx, span := trace.StartSpan(ctx, "vm.Flush") + defer span.End() + from := dag.NewDAGService(bserv.New(vm.buf, nil)) to := dag.NewDAGService(bserv.New(vm.buf.Read(), nil)) From c63eca1e45ace284a07fd46582bbda7cee551680 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sun, 13 Oct 2019 09:41:24 +0900 Subject: [PATCH 15/23] if not needed License: MIT Signed-off-by: Jakub Sztandera --- chain/blocksync.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/chain/blocksync.go b/chain/blocksync.go index b2e65a9c2..50d24196a 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -123,12 +123,10 @@ func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncR }, nil } - if span.IsRecordingEvents() { - span.AddAttributes( - trace.BoolAttribute("blocks", opts.IncludeBlocks), - trace.BoolAttribute("messages", opts.IncludeMessages), - ) - } + span.AddAttributes( + trace.BoolAttribute("blocks", opts.IncludeBlocks), + trace.BoolAttribute("messages", opts.IncludeMessages), + ) chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts) if err != nil { From 68db93b62e3b63d52908c8f276775ffd6cf9afe0 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Sun, 13 Oct 2019 03:05:43 +0200 Subject: [PATCH 16/23] More if not needed License: MIT Signed-off-by: Jakub Sztandera --- chain/stmgr/stmgr.go | 4 +--- chain/sync.go | 11 +++++------ 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 86a09ecb0..9a07c2ce6 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -55,9 +55,7 @@ func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid. cached, ok := sm.stCache[ck] sm.stlk.Unlock() if ok { - if span.IsRecordingEvents() { - span.AddAttributes(trace.BoolAttribute("cache", true)) - } + span.AddAttributes(trace.BoolAttribute("cache", true)) return cached[0], cached[1], nil } diff --git a/chain/sync.go b/chain/sync.go index 7f1db0d27..573563554 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -613,12 +613,11 @@ func (syncer *Syncer) verifyBlsAggregate(sig types.Signature, msgs []cid.Cid, pu func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { ctx, span := trace.StartSpan(ctx, "collectHeaders") defer span.End() - if span.IsRecordingEvents() { - span.AddAttributes( - trace.Int64Attribute("fromHeight", int64(from.Height())), - trace.Int64Attribute("toHeight", int64(to.Height())), - ) - } + + span.AddAttributes( + trace.Int64Attribute("fromHeight", int64(from.Height())), + trace.Int64Attribute("toHeight", int64(to.Height())), + ) blockSet := []*types.TipSet{from} From 5a733f282ed69aa2e7a09bac2f756af195c61f5a Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sun, 13 Oct 2019 22:03:15 +0900 Subject: [PATCH 17/23] Some simple measures to avoid mpool DoSing --- chain/messagepool.go | 81 +++++++++++++++++++++++++++++++----- chain/types/bigint.go | 2 + chain/types/signedmessage.go | 10 +++++ 3 files changed, 82 insertions(+), 11 deletions(-) diff --git a/chain/messagepool.go b/chain/messagepool.go index b4bacd7a5..0afde83c1 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -1,6 +1,7 @@ package chain import ( + "fmt" "sync" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -12,19 +13,35 @@ import ( "github.com/filecoin-project/go-lotus/chain/types" ) +var ( + ErrMessageTooBig = fmt.Errorf("message too big") + + ErrMessageValueTooHigh = fmt.Errorf("cannot send more filecoin than will ever exist") + + ErrNonceTooLow = fmt.Errorf("message nonce too low") + + ErrNotEnoughFunds = fmt.Errorf("not enough funds to execute transaction") +) + type MessagePool struct { lk sync.Mutex - pending map[address.Address]*msgSet + pending map[address.Address]*msgSet + pendingCount int sm *stmgr.StateManager ps *pubsub.PubSub + + minGasPrice types.BigInt + + maxTxPoolSize int } type msgSet struct { - msgs map[uint64]*types.SignedMessage - nextNonce uint64 + msgs map[uint64]*types.SignedMessage + nextNonce uint64 + curBalance types.BigInt } func newMsgSet() *msgSet { @@ -51,9 +68,11 @@ func (ms *msgSet) add(m *types.SignedMessage) error { func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool { mp := &MessagePool{ - pending: make(map[address.Address]*msgSet), - sm: sm, - ps: ps, + pending: make(map[address.Address]*msgSet), + sm: sm, + ps: ps, + minGasPrice: types.NewInt(0), + maxTxPoolSize: 100000, } sm.ChainStore().SubscribeHeadChanges(mp.HeadChange) @@ -74,6 +93,38 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error { } func (mp *MessagePool) Add(m *types.SignedMessage) error { + // big messages are bad, anti DOS + if m.Size() > 32*1024 { + return ErrMessageTooBig + } + + if !m.Message.Value.LessThan(types.TotalFilecoinInt) { + return ErrMessageValueTooHigh + } + + if err := m.Signature.Verify(m.Message.From, m.Message.Cid().Bytes()); err != nil { + log.Warnf("mpooladd signature verification failed: %s", err) + return err + } + + snonce, err := mp.getStateNonce(m.Message.From) + if err != nil { + return xerrors.Errorf("failed to look up actor state nonce: %w", err) + } + + if snonce > m.Message.Nonce { + return ErrNonceTooLow + } + + balance, err := mp.getStateBalance(m.Message.From) + if err != nil { + return xerrors.Errorf("failed to check sender balance: %w", err) + } + + if balance.LessThan(m.Message.RequiredFunds()) { + return ErrNotEnoughFunds + } + mp.lk.Lock() defer mp.lk.Unlock() @@ -83,11 +134,6 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error { func (mp *MessagePool) addLocked(m *types.SignedMessage) error { log.Debugf("mpooladd: %s %s", m.Message.From, m.Message.Nonce) - if err := m.Signature.Verify(m.Message.From, m.Message.Cid().Bytes()); err != nil { - log.Warnf("mpooladd signature verification failed: %s", err) - return err - } - if _, err := mp.sm.ChainStore().PutMessage(m); err != nil { log.Warnf("mpooladd cs.PutMessage failed: %s", err) return err @@ -116,6 +162,10 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { return mset.nextNonce, nil } + return mp.getStateNonce(addr) +} + +func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) { act, err := mp.sm.GetActor(addr, nil) if err != nil { return 0, err @@ -124,6 +174,15 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { return act.Nonce, nil } +func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, error) { + act, err := mp.sm.GetActor(addr, nil) + if err != nil { + return types.EmptyInt, err + } + + return act.Balance, nil +} + func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) { mp.lk.Lock() defer mp.lk.Unlock() diff --git a/chain/types/bigint.go b/chain/types/bigint.go index a1c4e8982..329781b9f 100644 --- a/chain/types/bigint.go +++ b/chain/types/bigint.go @@ -16,6 +16,8 @@ import ( const BigIntMaxSerializedLen = 128 // is this big enough? or too big? +var TotalFilecoinInt = FromFil(build.TotalFilecoin) + func init() { cbor.RegisterCborType(atlas.BuildEntry(BigInt{}).Transform(). TransformMarshal(atlas.MakeMarshalTransformFunc( diff --git a/chain/types/signedmessage.go b/chain/types/signedmessage.go index a08f4fba8..0ede20a4f 100644 --- a/chain/types/signedmessage.go +++ b/chain/types/signedmessage.go @@ -62,6 +62,16 @@ func (sm *SignedMessage) Serialize() ([]byte, error) { return buf.Bytes(), nil } +func (sm *SignedMessage) Size() int { + serdata, err := sm.Serialize() + if err != nil { + log.Errorf("serializing message failed: %s", err) + return 0 + } + + return len(serdata) +} + func (sm *SignedMessage) VMMessage() *Message { return &sm.Message } From 097e569792c33c35e14d20b95e32c44f4a9f3914 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Sun, 13 Oct 2019 22:16:01 +0900 Subject: [PATCH 18/23] remove not found log, its okay --- chain/gen/gen.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/chain/gen/gen.go b/chain/gen/gen.go index a6f454ed6..9e1c8aa2a 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -67,9 +67,6 @@ type mybs struct { func (m mybs) Get(c cid.Cid) (block.Block, error) { b, err := m.Blockstore.Get(c) if err != nil { - // change to error for stacktraces, don't commit with that pls - // TODO: debug why we get so many not founds in tests - log.Warnf("Get failed: %s %s", c, err) return nil, err } From 55e7a484cc29edd88c130895bf00b1992a9dcb04 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 14 Oct 2019 10:44:11 +0900 Subject: [PATCH 19/23] improve tracing doc --- docs/tracing.md | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/docs/tracing.md b/docs/tracing.md index ee45601a5..35f6ffb0f 100644 --- a/docs/tracing.md +++ b/docs/tracing.md @@ -1,9 +1,30 @@ ## Tracing -Lotus uses Jaeger for tracing. Currently it always uses -localhost and default port (`localhost:6831`). +Lotus uses [OpenCensus](https://opencensus.io/) for tracing application flow. +This generates spans +through the execution of annotated code paths. -During dev you can use `jaeger-all-in-one` from: https://www.jaegertracing.io/download/ -Start the `jaeger-all-in-one` and open http://localhost:16686/ to view traces. -j -In production: you tell me and I might WTFM. +Currently it is set up to use jaeger, though other tracing backends should be +fairly easy to swap in. + +## Running Locally + +To easily run and view tracing locally, first, install jaeger. The easiest way +to do this is to download the binaries from +https://www.jaegertracing.io/download/ and then run the `jaeger-all-in-one` +binary. This will start up jaeger, listen for spans on `localhost:6831`, and +expose a web UI for viewing traces on `http://localhost:16686/`. + +Now, to start sending traces from lotus to jaeger, set the environment variable +`LOTUS_JAEGER` to `localhost:6831`, and start the `lotus daemon`. + +Now, to view any generated traces, open up `http://localhost:16686/` in your +browser. + +## Adding Spans +To annotate a new codepath with spans, add the following lines to the top of the function you wish to trace: + +```go +ctx, span := trace.StartSpan(ctx, "put function name here") +defer span.End() +``` From 62057a8929e62369c874b2e9e17823d372e1104f Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 14 Oct 2019 12:28:19 +0900 Subject: [PATCH 20/23] reject messages with bad 'To' addresses --- chain/messagepool.go | 6 ++++++ chain/sync.go | 21 +++++++++++++++++++-- miner/miner.go | 5 +++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/chain/messagepool.go b/chain/messagepool.go index 0afde83c1..845447af8 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -21,6 +21,8 @@ var ( ErrNonceTooLow = fmt.Errorf("message nonce too low") ErrNotEnoughFunds = fmt.Errorf("not enough funds to execute transaction") + + ErrInvalidToAddr = fmt.Errorf("message had invalid to address") ) type MessagePool struct { @@ -98,6 +100,10 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error { return ErrMessageTooBig } + if m.Message.To == address.Undef { + return ErrInvalidToAddr + } + if !m.Message.Value.LessThan(types.TotalFilecoinInt) { return ErrMessageValueTooHigh } diff --git a/chain/sync.go b/chain/sync.go index 573563554..6826d5592 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -493,9 +493,22 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err return xerrors.Errorf("miner created a block but was not a winner") } + if err := syncer.checkBlockMessages(ctx, b, baseTs); err != nil { + return xerrors.Errorf("block had invalid messages: %w", err) + } + + return nil +} + +func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock, baseTs *types.TipSet) error { nonces := make(map[address.Address]uint64) balances := make(map[address.Address]types.BigInt) + stateroot, _, err := syncer.sm.TipSetState(ctx, baseTs) + if err != nil { + return err + } + cst := hamt.CSTFromBstore(syncer.store.Blockstore()) st, err := state.LoadStateTree(cst, stateroot) if err != nil { @@ -503,6 +516,10 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err } checkMsg := func(m *types.Message) error { + if m.To == address.Undef { + return xerrors.New("'To' address cannot be empty") + } + if _, ok := nonces[m.From]; !ok { act, err := st.GetActor(m.From) if err != nil { @@ -547,7 +564,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err pubks = append(pubks, pubk) } - if err := syncer.verifyBlsAggregate(h.BLSAggregate, sigCids, pubks); err != nil { + if err := syncer.verifyBlsAggregate(b.Header.BLSAggregate, sigCids, pubks); err != nil { return xerrors.Errorf("bls aggregate signature was invalid: %w", err) } @@ -588,7 +605,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err return err } - if h.Messages != mrcid { + if b.Header.Messages != mrcid { return fmt.Errorf("messages didnt match message root in header") } diff --git a/miner/miner.go b/miner/miner.go index 9ef74787d..65b1d69e0 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -323,6 +323,11 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs inclNonces := make(map[address.Address]uint64) inclBalances := make(map[address.Address]types.BigInt) for _, msg := range msgs { + if msg.Message.To == address.Undef { + log.Warnf("message in mempool had bad 'To' address") + continue + } + from := msg.Message.From act, err := al(ctx, from, base.ts) if err != nil { From f478267fd1e2bff3827426f29a8f628013f06fb5 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 14 Oct 2019 13:32:55 +0900 Subject: [PATCH 21/23] fix tests --- miner/miner_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/miner/miner_test.go b/miner/miner_test.go index dc79bf129..5bbda8318 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -40,6 +40,7 @@ func TestMessageFiltering(t *testing.T) { msgs := []types.Message{ types.Message{ From: a1, + To: a1, Nonce: 3, Value: types.NewInt(500), GasLimit: types.NewInt(50), @@ -47,6 +48,7 @@ func TestMessageFiltering(t *testing.T) { }, types.Message{ From: a1, + To: a1, Nonce: 4, Value: types.NewInt(500), GasLimit: types.NewInt(50), @@ -54,6 +56,7 @@ func TestMessageFiltering(t *testing.T) { }, types.Message{ From: a2, + To: a1, Nonce: 1, Value: types.NewInt(800), GasLimit: types.NewInt(100), @@ -61,6 +64,7 @@ func TestMessageFiltering(t *testing.T) { }, types.Message{ From: a2, + To: a1, Nonce: 0, Value: types.NewInt(800), GasLimit: types.NewInt(100), @@ -68,6 +72,7 @@ func TestMessageFiltering(t *testing.T) { }, types.Message{ From: a2, + To: a1, Nonce: 2, Value: types.NewInt(150), GasLimit: types.NewInt(100), From f3593225f851ed1dc051abfffc2171357dff3cf0 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Mon, 14 Oct 2019 13:39:42 +0900 Subject: [PATCH 22/23] add timestamp to chain list command output --- cli/chain.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cli/chain.go b/cli/chain.go index 3782d42d5..7e8896492 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "time" cid "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -321,7 +322,9 @@ var chainListCmd = &cli.Command{ } for i := len(tss) - 1; i >= 0; i-- { - fmt.Printf("%d [ ", tss[i].Height()) + mints := tss[i].MinTimestamp() + t := time.Unix(int64(mints), 0) + fmt.Printf("%d: (%s) [ ", tss[i].Height(), t.Format(time.Stamp)) for _, b := range tss[i].Blocks() { fmt.Printf("%s: %s,", b.Cid(), b.Miner) } From 4ff717e881b8527dbe81a10694a9df006d2e941e Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Mon, 14 Oct 2019 15:51:51 +0200 Subject: [PATCH 23/23] Make the `mined block in the past` structured License: MIT Signed-off-by: Jakub Sztandera --- miner/miner.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/miner/miner.go b/miner/miner.go index 65b1d69e0..be9025a42 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -179,7 +179,8 @@ func (m *Miner) mine(ctx context.Context) { if time.Now().Before(btime) { time.Sleep(time.Until(btime)) } else { - log.Warnf("Mined block in the past: b.T: %s, T: %s, dT: %s", btime, time.Now(), time.Now().Sub(btime)) + log.Warnw("mined block in the past", "block-time", btime, + "time", time.Now(), "duration", time.Now().Sub(btime)) } if err := m.api.ChainSubmitBlock(ctx, b); err != nil {
{k}{v.NodeName}{v.Height}{k}{v.NodeName}{v.Height}{mnrs}