Merge branch 'master' of github.com:eshon/lotus

This commit is contained in:
eshon 2019-10-14 16:02:14 +02:00
commit 9673098b18
33 changed files with 583 additions and 67 deletions

1
.gitignore vendored
View File

@ -1,6 +1,7 @@
/lotus
/lotus-storage-miner
/pond
/townhall
/lotuspond/front/node_modules
/lotuspond/front/build
**/*.h

View File

@ -88,6 +88,13 @@ pond: build
(cd lotuspond/front && npm i && npm run build)
.PHONY: pond
townhall:
rm -f townhall
go build -o townhall ./cmd/lotus-townhall
(cd ./cmd/lotus-townhall/townhall && npm i && npm run build)
go run github.com/GeertJohan/go.rice/rice append --exec townhall -i ./cmd/lotus-townhall
.PHONY: townhall
clean:
rm -rf $(CLEAN)
-$(MAKE) -C $(BLS_PATH) clean

View File

@ -129,6 +129,8 @@ type FullNode interface {
StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
StatePledgeCollateral(context.Context, *types.TipSet) (types.BigInt, error)
StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error)
StateListMiners(context.Context, *types.TipSet) ([]address.Address, error)
StateListActors(context.Context, *types.TipSet) ([]address.Address, error)
PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error)
PaychList(context.Context) ([]address.Address, error)

View File

@ -95,6 +95,8 @@ type FullNodeStruct struct {
StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"`
StatePledgeCollateral func(context.Context, *types.TipSet) (types.BigInt, error) `perm:"read"`
StateWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"`
StateListMiners func(context.Context, *types.TipSet) ([]address.Address, error) `perm:"read"`
StateListActors func(context.Context, *types.TipSet) ([]address.Address, error) `perm:"read"`
PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) `perm:"sign"`
PaychList func(context.Context) ([]address.Address, error) `perm:"read"`
@ -368,6 +370,13 @@ func (c *FullNodeStruct) StatePledgeCollateral(ctx context.Context, ts *types.Ti
func (c *FullNodeStruct) StateWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) {
return c.Internal.StateWaitMsg(ctx, msgc)
}
func (c *FullNodeStruct) StateListMiners(ctx context.Context, ts *types.TipSet) ([]address.Address, error) {
return c.Internal.StateListMiners(ctx, ts)
}
func (c *FullNodeStruct) StateListActors(ctx context.Context, ts *types.TipSet) ([]address.Address, error) {
return c.Internal.StateListActors(ctx, ts)
}
func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) {
return c.Internal.PaychGet(ctx, from, to, ensureFunds)

View File

@ -450,6 +450,28 @@ func MinerSetHas(ctx context.Context, vmctx types.VMContext, rcid cid.Cid, maddr
}
}
func MinerSetList(ctx context.Context, cst *hamt.CborIpldStore, rcid cid.Cid) ([]address.Address, error) {
nd, err := hamt.LoadNode(ctx, cst, rcid)
if err != nil {
return nil, xerrors.Errorf("failed to load miner set: %w", err)
}
var out []address.Address
err = nd.ForEach(ctx, func(k string, val interface{}) error {
addr, err := address.NewFromBytes([]byte(k))
if err != nil {
return err
}
out = append(out, addr)
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
func MinerSetAdd(ctx context.Context, vmctx types.VMContext, rcid cid.Cid, maddr address.Address) (cid.Cid, aerrors.ActorError) {
nd, err := hamt.LoadNode(ctx, vmctx.Ipld(), rcid)
if err != nil {

View File

@ -10,6 +10,7 @@ import (
bserv "github.com/ipfs/go-blockservice"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/store"
@ -86,6 +87,9 @@ func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {
}
func (bss *BlockSyncService) HandleStream(s inet.Stream) {
ctx, span := trace.StartSpan(context.Background(), "blocksync.HandleStream")
defer span.End()
defer s.Close()
var req BlockSyncRequest
@ -95,7 +99,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
}
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)
resp, err := bss.processRequest(&req)
resp, err := bss.processRequest(ctx, &req)
if err != nil {
log.Error("failed to process block sync request: ", err)
return
@ -107,7 +111,10 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
}
}
func (bss *BlockSyncService) processRequest(req *BlockSyncRequest) (*BlockSyncResponse, error) {
func (bss *BlockSyncService) processRequest(ctx context.Context, req *BlockSyncRequest) (*BlockSyncResponse, error) {
ctx, span := trace.StartSpan(ctx, "blocksync.ProcessRequest")
defer span.End()
opts := ParseBSOptions(req.Options)
if len(req.Start) == 0 {
return &BlockSyncResponse{
@ -116,6 +123,11 @@ func (bss *BlockSyncService) processRequest(req *BlockSyncRequest) (*BlockSyncRe
}, nil
}
span.AddAttributes(
trace.BoolAttribute("blocks", opts.IncludeBlocks),
trace.BoolAttribute("messages", opts.IncludeMessages),
)
chain, err := bss.collectChainSegment(req.Start, req.RequestLength, opts)
if err != nil {
log.Error("encountered error while responding to block sync request: ", err)
@ -253,6 +265,15 @@ func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse
}
func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(
trace.StringAttribute("tipset", fmt.Sprint(tipset)),
trace.Int64Attribute("count", int64(count)),
)
}
peers := bs.getPeers()
perm := rand.Perm(len(peers))
// TODO: round robin through these peers on error
@ -321,6 +342,9 @@ func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid)
}
func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, count uint64) ([]*BSTipSet, error) {
ctx, span := trace.StartSpan(ctx, "GetChainMessages")
defer span.End()
peers := bs.getPeers()
perm := rand.Perm(len(peers))
// TODO: round robin through these peers on error

View File

@ -132,9 +132,12 @@ func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence
log.Warnf("events.ChainAt: calling HandleFunc with nil tipset, not found in cache: %s", err)
}
e.lk.Unlock()
if err := hnd(ts, bestH); err != nil {
return err
}
e.lk.Lock()
bestH = e.tsc.best().Height()
}
if bestH >= h+uint64(confidence)+e.gcConfidence {

View File

@ -364,6 +364,74 @@ func TestAtStartConfidence(t *testing.T) {
require.Equal(t, false, reverted)
}
func TestAtChained(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
var applied bool
var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error {
return events.ChainAt(func(ts *types.TipSet, curH uint64) error {
require.Equal(t, 10, int(ts.Height()))
applied = true
return nil
}, func(ts *types.TipSet) error {
reverted = true
return nil
}, 3, 10)
}, func(ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
fcs.advance(0, 15, nil)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
func TestAtChainedConfidence(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
fcs.advance(0, 15, nil)
var applied bool
var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error {
return events.ChainAt(func(ts *types.TipSet, curH uint64) error {
require.Equal(t, 10, int(ts.Height()))
applied = true
return nil
}, func(ts *types.TipSet) error {
reverted = true
return nil
}, 3, 10)
}, func(ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}
func TestCalled(t *testing.T) {
fcs := &fakeCS{
t: t,

View File

@ -67,9 +67,6 @@ type mybs struct {
func (m mybs) Get(c cid.Cid) (block.Block, error) {
b, err := m.Blockstore.Get(c)
if err != nil {
// change to error for stacktraces, don't commit with that pls
// TODO: debug why we get so many not founds in tests
log.Warnf("Get failed: %s %s", c, err)
return nil, err
}

View File

@ -1,6 +1,7 @@
package chain
import (
"fmt"
"sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -12,19 +13,37 @@ import (
"github.com/filecoin-project/go-lotus/chain/types"
)
var (
ErrMessageTooBig = fmt.Errorf("message too big")
ErrMessageValueTooHigh = fmt.Errorf("cannot send more filecoin than will ever exist")
ErrNonceTooLow = fmt.Errorf("message nonce too low")
ErrNotEnoughFunds = fmt.Errorf("not enough funds to execute transaction")
ErrInvalidToAddr = fmt.Errorf("message had invalid to address")
)
type MessagePool struct {
lk sync.Mutex
pending map[address.Address]*msgSet
pending map[address.Address]*msgSet
pendingCount int
sm *stmgr.StateManager
ps *pubsub.PubSub
minGasPrice types.BigInt
maxTxPoolSize int
}
type msgSet struct {
msgs map[uint64]*types.SignedMessage
nextNonce uint64
msgs map[uint64]*types.SignedMessage
nextNonce uint64
curBalance types.BigInt
}
func newMsgSet() *msgSet {
@ -51,9 +70,11 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
mp := &MessagePool{
pending: make(map[address.Address]*msgSet),
sm: sm,
ps: ps,
pending: make(map[address.Address]*msgSet),
sm: sm,
ps: ps,
minGasPrice: types.NewInt(0),
maxTxPoolSize: 100000,
}
sm.ChainStore().SubscribeHeadChanges(mp.HeadChange)
@ -74,6 +95,42 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error {
}
func (mp *MessagePool) Add(m *types.SignedMessage) error {
// big messages are bad, anti DOS
if m.Size() > 32*1024 {
return ErrMessageTooBig
}
if m.Message.To == address.Undef {
return ErrInvalidToAddr
}
if !m.Message.Value.LessThan(types.TotalFilecoinInt) {
return ErrMessageValueTooHigh
}
if err := m.Signature.Verify(m.Message.From, m.Message.Cid().Bytes()); err != nil {
log.Warnf("mpooladd signature verification failed: %s", err)
return err
}
snonce, err := mp.getStateNonce(m.Message.From)
if err != nil {
return xerrors.Errorf("failed to look up actor state nonce: %w", err)
}
if snonce > m.Message.Nonce {
return ErrNonceTooLow
}
balance, err := mp.getStateBalance(m.Message.From)
if err != nil {
return xerrors.Errorf("failed to check sender balance: %w", err)
}
if balance.LessThan(m.Message.RequiredFunds()) {
return ErrNotEnoughFunds
}
mp.lk.Lock()
defer mp.lk.Unlock()
@ -83,11 +140,6 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
log.Debugf("mpooladd: %s %s", m.Message.From, m.Message.Nonce)
if err := m.Signature.Verify(m.Message.From, m.Message.Cid().Bytes()); err != nil {
log.Warnf("mpooladd signature verification failed: %s", err)
return err
}
if _, err := mp.sm.ChainStore().PutMessage(m); err != nil {
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
return err
@ -116,6 +168,10 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
return mset.nextNonce, nil
}
return mp.getStateNonce(addr)
}
func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
act, err := mp.sm.GetActor(addr, nil)
if err != nil {
return 0, err
@ -124,6 +180,15 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
return act.Nonce, nil
}
func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, error) {
act, err := mp.sm.GetActor(addr, nil)
if err != nil {
return types.EmptyInt, err
}
return act.Balance, nil
}
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
mp.lk.Lock()
defer mp.lk.Unlock()

View File

@ -47,12 +47,15 @@ func cidsToKey(cids []cid.Cid) string {
}
func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) {
ctx, span := trace.StartSpan(ctx, "tipSetState")
defer span.End()
ck := cidsToKey(ts.Cids())
sm.stlk.Lock()
cached, ok := sm.stCache[ck]
sm.stlk.Unlock()
if ok {
span.AddAttributes(trace.BoolAttribute("cache", true))
return cached[0], cached[1], nil
}
@ -110,11 +113,10 @@ func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.Bl
return cid.Undef, cid.Undef, xerrors.Errorf("failed to get miner owner actor")
}
if err := vm.DeductFunds(netact, reward); err != nil {
if err := vm.Transfer(netact, act, reward); err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("failed to deduct funds from network actor: %w", err)
}
vm.DepositFunds(act, reward)
}
// TODO: can't use method from chainstore because it doesnt let us know who the block miners were
@ -426,3 +428,34 @@ func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid) (*t
return nil, nil
}
func (sm *StateManager) ListAllActors(ctx context.Context, ts *types.TipSet) ([]address.Address, error) {
if ts == nil {
ts = sm.ChainStore().GetHeaviestTipSet()
}
st, _, err := sm.TipSetState(ctx, ts)
if err != nil {
return nil, err
}
cst := hamt.CSTFromBstore(sm.ChainStore().Blockstore())
r, err := hamt.LoadNode(ctx, cst, st)
if err != nil {
return nil, err
}
var out []address.Address
err = r.ForEach(ctx, func(k string, val interface{}) error {
addr, err := address.NewFromBytes([]byte(k))
if err != nil {
return xerrors.Errorf("address in state tree was not valid: %w", err)
}
out = append(out, addr)
return nil
})
if err != nil {
return nil, err
}
return out, nil
}

View File

@ -352,6 +352,9 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
}
func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet) error {
ctx, span := trace.StartSpan(ctx, "validateTipSet")
defer span.End()
ts := fts.TipSet()
if ts.Equals(syncer.Genesis) {
return nil
@ -423,6 +426,8 @@ func (syncer *Syncer) validateTickets(ctx context.Context, mworker address.Addre
// Should match up with 'Semantical Validation' in validation.md in the spec
func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) error {
ctx, span := trace.StartSpan(ctx, "validateBlock")
defer span.End()
h := b.Header
@ -488,9 +493,22 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
return xerrors.Errorf("miner created a block but was not a winner")
}
if err := syncer.checkBlockMessages(ctx, b, baseTs); err != nil {
return xerrors.Errorf("block had invalid messages: %w", err)
}
return nil
}
func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock, baseTs *types.TipSet) error {
nonces := make(map[address.Address]uint64)
balances := make(map[address.Address]types.BigInt)
stateroot, _, err := syncer.sm.TipSetState(ctx, baseTs)
if err != nil {
return err
}
cst := hamt.CSTFromBstore(syncer.store.Blockstore())
st, err := state.LoadStateTree(cst, stateroot)
if err != nil {
@ -498,6 +516,10 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
}
checkMsg := func(m *types.Message) error {
if m.To == address.Undef {
return xerrors.New("'To' address cannot be empty")
}
if _, ok := nonces[m.From]; !ok {
act, err := st.GetActor(m.From)
if err != nil {
@ -542,7 +564,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
pubks = append(pubks, pubk)
}
if err := syncer.verifyBlsAggregate(h.BLSAggregate, sigCids, pubks); err != nil {
if err := syncer.verifyBlsAggregate(b.Header.BLSAggregate, sigCids, pubks); err != nil {
return xerrors.Errorf("bls aggregate signature was invalid: %w", err)
}
@ -583,7 +605,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
return err
}
if h.Messages != mrcid {
if b.Header.Messages != mrcid {
return fmt.Errorf("messages didnt match message root in header")
}
@ -606,6 +628,14 @@ func (syncer *Syncer) verifyBlsAggregate(sig types.Signature, msgs []cid.Cid, pu
}
func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "collectHeaders")
defer span.End()
span.AddAttributes(
trace.Int64Attribute("fromHeight", int64(from.Height())),
trace.Int64Attribute("toHeight", int64(to.Height())),
)
blockSet := []*types.TipSet{from}
at := from.Parents()
@ -647,7 +677,7 @@ loop:
if gap := int(blockSet[len(blockSet)-1].Height() - untilHeight); gap < window {
window = gap
}
blks, err := syncer.Bsync.GetBlocks(context.TODO(), at, window)
blks, err := syncer.Bsync.GetBlocks(ctx, at, window)
if err != nil {
// Most likely our peers aren't fully synced yet, but forwarded
// new block message (ideally we'd find better peers)
@ -726,7 +756,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *type
func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {
syncer.syncState.SetHeight(0)
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
return syncer.iterFullTipsets(ctx, headers, func(ctx context.Context, fts *store.FullTipSet) error {
log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids()))
if err := syncer.ValidateTipSet(ctx, fts); err != nil {
log.Errorf("failed to validate tipset: %+v", err)
@ -740,7 +770,10 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*
}
// fills out each of the given tipsets with messages and calls the callback with it
func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.FullTipSet) error) error {
func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipSet, cb func(context.Context, *store.FullTipSet) error) error {
ctx, span := trace.StartSpan(ctx, "iterFullTipsets")
defer span.End()
beg := len(headers) - 1
// handle case where we have a prefix of these locally
for ; beg >= 0; beg-- {
@ -751,7 +784,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
if fts == nil {
break
}
if err := cb(fts); err != nil {
if err := cb(ctx, fts); err != nil {
return err
}
}
@ -767,7 +800,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
}
next := headers[i-batchSize]
bstips, err := syncer.Bsync.GetChainMessages(context.TODO(), next, uint64(batchSize+1))
bstips, err := syncer.Bsync.GetChainMessages(ctx, next, uint64(batchSize+1))
if err != nil {
return xerrors.Errorf("message processing failed: %w", err)
}
@ -788,7 +821,7 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
return xerrors.Errorf("message processing failed: %w", err)
}
if err := cb(fts); err != nil {
if err := cb(ctx, fts); err != nil {
return err
}
@ -828,6 +861,9 @@ func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error {
}
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "collectChain")
defer span.End()
syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts)
headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet())

View File

@ -16,6 +16,8 @@ import (
const BigIntMaxSerializedLen = 128 // is this big enough? or too big?
var TotalFilecoinInt = FromFil(build.TotalFilecoin)
func init() {
cbor.RegisterCborType(atlas.BuildEntry(BigInt{}).Transform().
TransformMarshal(atlas.MakeMarshalTransformFunc(

View File

@ -62,6 +62,16 @@ func (sm *SignedMessage) Serialize() ([]byte, error) {
return buf.Bytes(), nil
}
func (sm *SignedMessage) Size() int {
serdata, err := sm.Serialize()
if err != nil {
log.Errorf("serializing message failed: %s", err)
return 0
}
return len(serdata)
}
func (sm *SignedMessage) VMMessage() *Message {
return &sm.Message
}

View File

@ -396,10 +396,9 @@ func (vm *VM) send(ctx context.Context, msg *types.Message, parent *VMContext,
return nil, aerrors.Wrap(aerr, "sending funds"), nil
}
if err := DeductFunds(fromActor, msg.Value); err != nil {
return nil, aerrors.Absorb(err, 1, "failed to deduct funds"), nil
if err := Transfer(fromActor, toActor, msg.Value); err != nil {
return nil, aerrors.Absorb(err, 1, "failed to transfer funds"), nil
}
DepositFunds(toActor, msg.Value)
}
if msg.Method != 0 {
@ -463,8 +462,10 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*ApplyRet,
if fromActor.Balance.LessThan(totalCost) {
return nil, xerrors.Errorf("not enough funds (%s < %s)", fromActor.Balance, totalCost)
}
if err := DeductFunds(fromActor, gascost); err != nil {
return nil, xerrors.Errorf("failed to deduct funds: %w", err)
gasHolder := &types.Actor{Balance: types.NewInt(0)}
if err := Transfer(fromActor, gasHolder, gascost); err != nil {
return nil, xerrors.Errorf("failed to withdraw gas funds: %w", err)
}
if msg.Nonce != fromActor.Nonce {
@ -494,7 +495,9 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*ApplyRet,
// refund unused gas
gasUsed = vmctx.GasUsed()
refund := types.BigMul(types.BigSub(msg.GasLimit, gasUsed), msg.GasPrice)
DepositFunds(fromActor, refund)
if err := Transfer(gasHolder, fromActor, refund); err != nil {
return nil, xerrors.Errorf("failed to refund gas")
}
}
miner, err := st.GetActor(vm.blockMiner)
@ -503,7 +506,13 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*ApplyRet,
}
gasReward := types.BigMul(msg.GasPrice, gasUsed)
DepositFunds(miner, gasReward)
if err := Transfer(gasHolder, miner, gasReward); err != nil {
return nil, xerrors.Errorf("failed to give miner gas reward: %w", err)
}
if types.BigCmp(types.NewInt(0), gasHolder.Balance) != 0 {
return nil, xerrors.Errorf("gas handling math is wrong")
}
return &ApplyRet{
MessageReceipt: types.MessageReceipt{
@ -529,6 +538,9 @@ func (vm *VM) ActorBalance(addr address.Address) (types.BigInt, aerrors.ActorErr
}
func (vm *VM) Flush(ctx context.Context) (cid.Cid, error) {
ctx, span := trace.StartSpan(ctx, "vm.Flush")
defer span.End()
from := dag.NewDAGService(bserv.New(vm.buf, nil))
to := dag.NewDAGService(bserv.New(vm.buf.Read(), nil))
@ -597,10 +609,9 @@ func (vm *VM) TransferFunds(from, to address.Address, amt types.BigInt) error {
return err
}
if err := DeductFunds(fromAct, amt); err != nil {
if err := Transfer(fromAct, toAct, amt); err != nil {
return xerrors.Errorf("failed to deduct funds: %w", err)
}
DepositFunds(toAct, amt)
return nil
}
@ -628,7 +639,19 @@ func (vm *VM) Invoke(act *types.Actor, vmctx *VMContext, method uint64, params [
return ret, nil
}
func DeductFunds(act *types.Actor, amt types.BigInt) error {
func Transfer(from, to *types.Actor, amt types.BigInt) error {
if amt.LessThan(types.NewInt(0)) {
return xerrors.Errorf("attempted to transfer negative value")
}
if err := deductFunds(from, amt); err != nil {
return err
}
depositFunds(to, amt)
return nil
}
func deductFunds(act *types.Actor, amt types.BigInt) error {
if act.Balance.LessThan(amt) {
return fmt.Errorf("not enough funds")
}
@ -637,7 +660,7 @@ func DeductFunds(act *types.Actor, amt types.BigInt) error {
return nil
}
func DepositFunds(act *types.Actor, amt types.BigInt) {
func depositFunds(act *types.Actor, amt types.BigInt) {
act.Balance = types.BigAdd(act.Balance, amt)
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"
cid "github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -276,6 +277,10 @@ func parseTipSet(api api.FullNode, ctx context.Context, vals []string) (*types.T
var chainListCmd = &cli.Command{
Name: "list",
Usage: "View a segment of the chain",
Flags: []cli.Flag{
&cli.Uint64Flag{Name: "height"},
&cli.UintFlag{Name: "count", Value: 30},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
@ -284,29 +289,42 @@ var chainListCmd = &cli.Command{
defer closer()
ctx := ReqContext(cctx)
head, err := api.ChainHead(ctx)
var head *types.TipSet
if cctx.IsSet("height") {
head, err = api.ChainGetTipSetByHeight(ctx, cctx.Uint64("height"), nil)
} else {
head, err = api.ChainHead(ctx)
}
if err != nil {
return err
}
tss := []*types.TipSet{head}
cur := head
for i := 1; i < 30; i++ {
if cur.Height() == 0 {
count := cctx.Uint("count")
if count < 1 {
return nil
}
tss := make([]*types.TipSet, count)
tss[0] = head
for i := 1; i < len(tss); i++ {
if head.Height() == 0 {
break
}
next, err := api.ChainGetTipSet(ctx, cur.Parents())
head, err = api.ChainGetTipSet(ctx, head.Parents())
if err != nil {
return err
}
tss = append(tss, next)
cur = next
tss[i] = head
}
for i := len(tss) - 1; i >= 0; i-- {
fmt.Printf("%d [ ", tss[i].Height())
mints := tss[i].MinTimestamp()
t := time.Unix(int64(mints), 0)
fmt.Printf("%d: (%s) [ ", tss[i].Height(), t.Format(time.Stamp))
for _, b := range tss[i].Blocks() {
fmt.Printf("%s: %s,", b.Cid(), b.Miner)
}

View File

@ -241,6 +241,14 @@ var clientQueryAskCmd = &cli.Command{
Name: "peerid",
Usage: "specify peer ID of node to make query against",
},
&cli.Int64Flag{
Name: "size",
Usage: "data size in bytes",
},
&cli.Int64Flag{
Name: "duration",
Usage: "deal duration",
},
},
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 1 {
@ -295,7 +303,20 @@ var clientQueryAskCmd = &cli.Command{
}
fmt.Printf("Ask: %s\n", maddr)
fmt.Printf("Price: %s\n", ask.Ask.Price)
fmt.Printf("Price per Byte: %s\n", ask.Ask.Price)
size := cctx.Int64("size")
if size == 0 {
return nil
}
fmt.Printf("Price per Block: %s\n", types.BigMul(ask.Ask.Price, types.NewInt(uint64(size))))
duration := cctx.Int64("duration")
if duration == 0 {
return nil
}
fmt.Printf("Total Price: %s\n", types.BigMul(types.BigMul(ask.Ask.Price, types.NewInt(uint64(size))), types.NewInt(uint64(duration))))
return nil
},
}

View File

@ -18,6 +18,8 @@ var stateCmd = &cli.Command{
stateSectorsCmd,
stateProvingSetCmd,
statePledgeCollateralCmd,
stateListActorsCmd,
stateListMinersCmd,
},
}
@ -199,3 +201,53 @@ var statePledgeCollateralCmd = &cli.Command{
return nil
},
}
var stateListMinersCmd = &cli.Command{
Name: "list-miners",
Usage: "list all miners in the network",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
miners, err := api.StateListMiners(ctx, nil)
if err != nil {
return err
}
for _, m := range miners {
fmt.Println(m.String())
}
return nil
},
}
var stateListActorsCmd = &cli.Command{
Name: "list-actors",
Usage: "list all actors in the network",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
actors, err := api.StateListActors(ctx, nil)
if err != nil {
return err
}
for _, a := range actors {
fmt.Println(a.String())
}
return nil
},
}

View File

@ -0,0 +1,34 @@
package main
import (
"fmt"
"gopkg.in/urfave/cli.v2"
lcli "github.com/filecoin-project/go-lotus/cli"
)
var infoCmd = &cli.Command{
Name: "info",
Usage: "Print storage miner info",
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
aaddr, err := nodeApi.ActorAddresses(ctx)
if err != nil {
return err
}
fmt.Printf("actor address: %s\n", aaddr)
// TODO: grab actr state / info
// * Sector size
// * Sealed sectors (count / bytes)
// * Power
return nil
},
}

View File

@ -21,6 +21,7 @@ func main() {
local := []*cli.Command{
runCmd,
initCmd,
infoCmd,
storeGarbageCmd,
sectorsCmd,
}

View File

@ -7,6 +7,7 @@ import (
"net/http"
"strings"
rice "github.com/GeertJohan/go.rice"
"github.com/gorilla/websocket"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/peer"
@ -59,8 +60,9 @@ func main() {
}
http.HandleFunc("/sub", handler(ps))
http.Handle("/", http.FileServer(rice.MustFindBox("townhall/build").HTTPBox()))
fmt.Println("listening")
fmt.Println("listening on http://localhost:2975")
if err := http.ListenAndServe("0.0.0.0:2975", nil); err != nil {
panic(err)

View File

@ -5,8 +5,8 @@ class App extends React.Component {
constructor(props) {
super(props);
//let ws = new WebSocket("ws://" + window.location.host + "/sub")
let ws = new WebSocket("ws://127.0.0.1:2975/sub")
let ws = new WebSocket("ws://" + window.location.host + "/sub")
//let ws = new WebSocket("ws://127.0.0.1:2975/sub")
ws.onmessage = (ev) => {
console.log(ev)
@ -25,7 +25,9 @@ class App extends React.Component {
console.log(best)
return <table>{Object.keys(this.state).map(k => [k, this.state[k]]).map(([k, v]) => {
let l = [<td>{k}</td>, <td>{v.NodeName}</td>, <td>{v.Height}</td>]
let mnrs = v.Blocks.map(b => <span>&nbsp;m:{b.Miner}</span>)
let l = [<td>{k}</td>, <td>{v.NodeName}</td>, <td>{v.Height}</td>, <td>{mnrs}</td>]
if (best !== v.Height) {
l = <tr style={{color: '#f00'}}>{l}</tr>
} else {

View File

@ -1,9 +1,30 @@
## Tracing
Lotus uses Jaeger for tracing. Currently it always uses
localhost and default port (`localhost:6831`).
Lotus uses [OpenCensus](https://opencensus.io/) for tracing application flow.
This generates spans
through the execution of annotated code paths.
During dev you can use `jaeger-all-in-one` from: https://www.jaegertracing.io/download/
Start the `jaeger-all-in-one` and open http://localhost:16686/ to view traces.
j
In production: you tell me and I might WTFM.
Currently it is set up to use jaeger, though other tracing backends should be
fairly easy to swap in.
## Running Locally
To easily run and view tracing locally, first, install jaeger. The easiest way
to do this is to download the binaries from
https://www.jaegertracing.io/download/ and then run the `jaeger-all-in-one`
binary. This will start up jaeger, listen for spans on `localhost:6831`, and
expose a web UI for viewing traces on `http://localhost:16686/`.
Now, to start sending traces from lotus to jaeger, set the environment variable
`LOTUS_JAEGER` to `localhost:6831`, and start the `lotus daemon`.
Now, to view any generated traces, open up `http://localhost:16686/` in your
browser.
## Adding Spans
To annotate a new codepath with spans, add the following lines to the top of the function you wish to trace:
```go
ctx, span := trace.StartSpan(ctx, "put function name here")
defer span.End()
```

View File

@ -163,7 +163,7 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int)
buf.PushBack(val)
if buf.Len() > 1 {
log.Warnf("rpc output channel has %d buffered messages", buf.Len())
log.Warnw("rpc output message buffer", "n", buf.Len())
bufLk.Unlock()
return
}

View File

@ -179,7 +179,8 @@ func (m *Miner) mine(ctx context.Context) {
if time.Now().Before(btime) {
time.Sleep(time.Until(btime))
} else {
log.Warnf("Mined block in the past: b.T: %s, T: %s, dT: %s", btime, time.Now(), time.Now().Sub(btime))
log.Warnw("mined block in the past", "block-time", btime,
"time", time.Now(), "duration", time.Now().Sub(btime))
}
if err := m.api.ChainSubmitBlock(ctx, b); err != nil {
@ -323,6 +324,11 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs
inclNonces := make(map[address.Address]uint64)
inclBalances := make(map[address.Address]types.BigInt)
for _, msg := range msgs {
if msg.Message.To == address.Undef {
log.Warnf("message in mempool had bad 'To' address")
continue
}
from := msg.Message.From
act, err := al(ctx, from, base.ts)
if err != nil {

View File

@ -40,6 +40,7 @@ func TestMessageFiltering(t *testing.T) {
msgs := []types.Message{
types.Message{
From: a1,
To: a1,
Nonce: 3,
Value: types.NewInt(500),
GasLimit: types.NewInt(50),
@ -47,6 +48,7 @@ func TestMessageFiltering(t *testing.T) {
},
types.Message{
From: a1,
To: a1,
Nonce: 4,
Value: types.NewInt(500),
GasLimit: types.NewInt(50),
@ -54,6 +56,7 @@ func TestMessageFiltering(t *testing.T) {
},
types.Message{
From: a2,
To: a1,
Nonce: 1,
Value: types.NewInt(800),
GasLimit: types.NewInt(100),
@ -61,6 +64,7 @@ func TestMessageFiltering(t *testing.T) {
},
types.Message{
From: a2,
To: a1,
Nonce: 0,
Value: types.NewInt(800),
GasLimit: types.NewInt(100),
@ -68,6 +72,7 @@ func TestMessageFiltering(t *testing.T) {
},
types.Message{
From: a2,
To: a1,
Nonce: 2,
Value: types.NewInt(150),
GasLimit: types.NewInt(100),

View File

@ -203,6 +203,8 @@ func Online() Option {
ApplyIf(func(s *Settings) bool { return s.nodeType == nodeFull },
// TODO: Fix offline mode
Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap),
Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages),
Override(new(*store.ChainStore), modules.ChainStore),
@ -294,6 +296,10 @@ func Config(cfg *config.Root) Option {
ApplyIf(func(s *Settings) bool { return s.Online },
Override(StartListeningKey, lp2p.StartListening(cfg.Libp2p.ListenAddresses)),
ApplyIf(func(s *Settings) bool { return len(cfg.Libp2p.BootstrapPeers) > 0 },
Override(new(dtypes.BootstrapPeers), modules.ConfigBootstrap(cfg.Libp2p.BootstrapPeers)),
),
ApplyIf(func(s *Settings) bool { return s.nodeType == nodeFull },
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
),

View File

@ -19,6 +19,7 @@ type API struct {
// Libp2p contains configs for libp2p
type Libp2p struct {
ListenAddresses []string
BootstrapPeers []string
}
type Metrics struct {

View File

@ -207,3 +207,22 @@ func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait,
TipSet: ts,
}, nil
}
func (a *StateAPI) StateListMiners(ctx context.Context, ts *types.TipSet) ([]address.Address, error) {
var state actors.StorageMarketState
if _, err := a.StateManager.LoadActorState(ctx, actors.StorageMarketAddress, &state, ts); err != nil {
return nil, err
}
cst := hamt.CSTFromBstore(a.StateManager.ChainStore().Blockstore())
miners, err := actors.MinerSetList(ctx, cst, state.Miners)
if err != nil {
return nil, err
}
return miners, nil
}
func (a *StateAPI) StateListActors(ctx context.Context, ts *types.TipSet) ([]address.Address, error) {
return a.StateManager.ListAllActors(ctx, ts)
}

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/lib/addrutil"
"github.com/filecoin-project/go-lotus/node/modules/helpers"
"github.com/gbrlsnchs/jwt/v3"
logging "github.com/ipfs/go-log"
@ -76,7 +77,17 @@ func APISecret(keystore types.KeyStore, lr repo.LockedRepo) (*dtypes.APIAlg, err
return (*dtypes.APIAlg)(jwt.NewHS256(key.PrivateKey)), nil
}
func Bootstrap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) {
func ConfigBootstrap(peers []string) func() (dtypes.BootstrapPeers, error) {
return func() (dtypes.BootstrapPeers, error) {
return addrutil.ParseAddresses(context.TODO(), peers)
}
}
func BuiltinBootstrap() (dtypes.BootstrapPeers, error) {
return build.BuiltinBootstrap()
}
func Bootstrap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, pinfos dtypes.BootstrapPeers) {
ctx, cancel := context.WithCancel(mctx)
lc.Append(fx.Hook{
@ -97,13 +108,7 @@ func Bootstrap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) {
log.Warn("No peers connected, performing automatic bootstrap")
pis, err := build.BuiltinBootstrap()
if err != nil {
log.Error("Getting bootstrap addrs: ", err)
return
}
for _, pi := range pis {
for _, pi := range pinfos {
if err := host.Connect(ctx, pi); err != nil {
log.Warn("bootstrap connect failed: ", err)
}

View File

@ -0,0 +1,5 @@
package dtypes
import "github.com/libp2p/go-libp2p-core/peer"
type BootstrapPeers []peer.AddrInfo

View File

@ -11,6 +11,8 @@ filebeat.inputs:
type: lotus-miner
fields_under_root: true
json.keys_under_root: false
json.message_key: msg
encoding: utf-8
ignore_older: 3h
- type: log
@ -22,6 +24,7 @@ filebeat.inputs:
type: lotus-daemon
fields_under_root: true
json.keys_under_root: false
json.message_key: msg
encoding: utf-8
ignore_older: 3h
@ -55,6 +58,13 @@ processors:
- '2019-10-10T22:37:48.297+0200'
- drop_fields:
fields: ['json.ts']
- if:
has_fields: ['json.msg']
then:
- rename:
fields:
- from: 'json.msg'
to: 'message'
############################# Output ##########################################

View File

@ -97,7 +97,13 @@ func (m *Miner) scheduleNextPost(ppe uint64) {
}
func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) error {
called := 0
return func(ts *types.TipSet, curH uint64) error {
if called > 0 {
log.Errorw("BUG: computePost callback called again", "ppe", ppe,
"height", ts.Height(), "curH", curH, "called", called)
}
called++
ctx := context.TODO()