api.MpoolSub

This commit is contained in:
Łukasz Magiera 2019-11-17 08:44:06 +01:00
parent 80095fdf3c
commit 1f913b8df2
5 changed files with 98 additions and 4 deletions

View File

@ -44,6 +44,7 @@ type FullNode interface {
MpoolPush(context.Context, *types.SignedMessage) error // TODO: remove
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) // get nonce, sign, push
MpoolGetNonce(context.Context, address.Address) (uint64, error)
MpoolSub(context.Context) (<-chan MpoolUpdate, error)
// FullNodeStruct
@ -276,3 +277,15 @@ const (
StageMessages
StageSyncComplete
)
type MpoolChange int
const (
MpoolAdd MpoolChange = iota
MpoolRemove
)
type MpoolUpdate struct {
Type MpoolChange
Message *types.SignedMessage
}

View File

@ -57,6 +57,8 @@ type FullNodeStruct struct {
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
MpoolPushMessage func(context.Context, *types.Message) (*types.SignedMessage, error) `perm:"sign"`
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
MpoolSub func(context.Context) (<-chan MpoolUpdate, error) `perm:"read"`
MinerRegister func(context.Context, address.Address) error `perm:"admin"`
MinerUnregister func(context.Context, address.Address) error `perm:"admin"`
@ -74,8 +76,6 @@ type FullNodeStruct struct {
WalletExport func(context.Context, address.Address) (*types.KeyInfo, error) `perm:"admin"`
WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"`
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
ClientImport func(ctx context.Context, path string) (cid.Cid, error) `perm:"admin"`
ClientListImports func(ctx context.Context) ([]Import, error) `perm:"write"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
@ -227,6 +227,10 @@ func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Messag
return c.Internal.MpoolPushMessage(ctx, msg)
}
func (c *FullNodeStruct) MpoolSub(ctx context.Context) (<-chan MpoolUpdate, error) {
return c.Internal.MpoolSub(ctx)
}
func (c *FullNodeStruct) MinerRegister(ctx context.Context, addr address.Address) error {
return c.Internal.MinerRegister(ctx, addr)
}

View File

@ -1,17 +1,19 @@
package chain
import (
"context"
"errors"
"sort"
"sync"
"time"
"errors"
lru "github.com/hashicorp/golang-lru"
pubsub "github.com/libp2p/go-libp2p-pubsub"
lps "github.com/whyrusleeping/pubsub"
"go.uber.org/multierr"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/stmgr"
@ -32,6 +34,8 @@ var (
const (
msgTopic = "/fil/messages"
localUpdates = "update"
)
type MessagePool struct {
@ -54,6 +58,8 @@ type MessagePool struct {
maxTxPoolSize int
blsSigCache *lru.TwoQueueCache
changes *lps.PubSub
}
type msgSet struct {
@ -95,6 +101,7 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
minGasPrice: types.NewInt(0),
maxTxPoolSize: 100000,
blsSigCache: cache,
changes: lps.New(50),
}
sm.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
err := mp.HeadChange(rev, app)
@ -231,6 +238,11 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
}
mset.add(m)
mp.changes.Pub(api.MpoolUpdate{
Type: api.MpoolAdd,
Message: m,
}, localUpdates)
return nil
}
@ -304,6 +316,11 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
return
}
mp.changes.Pub(api.MpoolUpdate{
Type: api.MpoolRemove,
Message: mset.msgs[nonce],
}, localUpdates)
// NB: This deletes any message with the given nonce. This makes sense
// as two messages with the same sender cannot have the same nonce
delete(mset.msgs, nonce)
@ -413,3 +430,25 @@ func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
Signature: sig,
}
}
func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, error) {
out := make(chan api.MpoolUpdate, 20)
sub := mp.changes.Sub(localUpdates)
go func() {
for {
select {
case u := <-sub:
select {
case out <- u.(api.MpoolUpdate):
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out, nil
}

View File

@ -12,6 +12,7 @@ var mpoolCmd = &cli.Command{
Usage: "Manage message pool",
Subcommands: []*cli.Command{
mpoolPending,
mpoolSub,
},
}
@ -43,3 +44,35 @@ var mpoolPending = &cli.Command{
return nil
},
}
var mpoolSub = &cli.Command{
Name: "sub",
Usage: "Subscibe to mpool changes",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
sub, err := api.MpoolSub(ctx)
if err != nil {
return err
}
for {
select {
case update := <-sub:
out, err := json.MarshalIndent(update, "", " ")
if err != nil {
return err
}
fmt.Println(string(out))
case <-ctx.Done():
return nil
}
}
},
}

View File

@ -6,6 +6,7 @@ import (
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
@ -53,3 +54,7 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
return a.Mpool.GetNonce(addr)
}
func (a *MpoolAPI) MpoolSub(ctx context.Context) (<-chan api.MpoolUpdate, error) {
return a.Mpool.Updates(ctx)
}