More atomic mpool APIs

This commit is contained in:
Łukasz Magiera 2019-09-16 16:17:08 +02:00
parent 385e0cfd48
commit 1e07a12a2e
5 changed files with 74 additions and 17 deletions

View File

@ -58,6 +58,7 @@ type FullNode interface {
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
MpoolPush(context.Context, *types.SignedMessage) error
MpoolPushMessage(context.Context, *types.Message) error // get nonce, sign, push
MpoolGetNonce(context.Context, address.Address) (uint64, error)
// FullNodeStruct

View File

@ -50,6 +50,7 @@ type FullNodeStruct struct {
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
MpoolPushMessage func(context.Context, *types.Message) error`perm:"sign"`
MinerRegister func(context.Context, address.Address) error `perm:"admin"`
MinerUnregister func(context.Context, address.Address) error `perm:"admin"`
@ -192,6 +193,10 @@ func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessag
return c.Internal.MpoolPush(ctx, smsg)
}
func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message) error {
return c.Internal.MpoolPushMessage(ctx, msg)
}
func (c *FullNodeStruct) MinerRegister(ctx context.Context, addr address.Address) error {
return c.Internal.MinerRegister(ctx, addr)
}

View File

@ -2,6 +2,7 @@ package chain
import (
"encoding/base64"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"sync"
"github.com/filecoin-project/go-lotus/chain/address"
@ -16,6 +17,8 @@ type MessagePool struct {
pending map[address.Address]*msgSet
sm *stmgr.StateManager
ps *pubsub.PubSub
}
type msgSet struct {
@ -36,20 +39,38 @@ func (ms *msgSet) add(m *types.SignedMessage) {
ms.msgs[m.Message.Nonce] = m
}
func NewMessagePool(sm *stmgr.StateManager) *MessagePool {
func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
mp := &MessagePool{
pending: make(map[address.Address]*msgSet),
sm: sm,
ps: ps,
}
sm.ChainStore().SubscribeHeadChanges(mp.HeadChange)
return mp
}
func (mp *MessagePool) Push(m *types.SignedMessage) error {
msgb, err := m.Serialize()
if err != nil {
return err
}
if err := mp.Add(m); err != nil {
return err
}
return mp.ps.Publish("/fil/messages", msgb)
}
func (mp *MessagePool) Add(m *types.SignedMessage) error {
mp.lk.Lock()
defer mp.lk.Unlock()
return mp.addLocked(m)
}
func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
data, err := m.Message.Serialize()
if err != nil {
return err
@ -79,6 +100,10 @@ func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
mp.lk.Lock()
defer mp.lk.Unlock()
return mp.getNonceLocked(addr)
}
func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
mset, ok := mp.pending[addr]
if ok {
return mset.startNonce + uint64(len(mset.msgs)), nil
@ -92,6 +117,32 @@ func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
return act.Nonce, nil
}
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) error {
mp.lk.Lock()
defer mp.lk.Unlock()
nonce, err := mp.getNonceLocked(addr)
if err != nil {
return err
}
msg, err := cb(nonce)
if err != nil {
return err
}
msgb, err := msg.Serialize()
if err != nil {
return err
}
if err := mp.addLocked(msg); err != nil {
return err
}
return mp.ps.Publish("/fil/messages", msgb)
}
func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
mp.lk.Lock()
defer mp.lk.Unlock()

View File

@ -19,8 +19,6 @@ async function pushMessage(client, from, inmsg) {
inmsg.Method = 0
}
inmsg.Nonce = await client.call('Filecoin.MpoolGetNonce', [from])
/* const msg = [
inmsg.To,
inmsg.From,
@ -36,11 +34,9 @@ async function pushMessage(client, from, inmsg) {
Buffer.from(inmsg.Params, 'base64'),
]*/
const signed = await client.call('Filecoin.WalletSignMessage', [from, inmsg])
console.log(inmsg)
console.log(signed)
await client.call('Filecoin.MpoolPush', [signed])
await client.call('Filecoin.MpoolPushMessage', [inmsg])
}
export default pushMessage

View File

@ -3,18 +3,18 @@ package full
import (
"context"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
type MpoolAPI struct {
fx.In
PubSub *pubsub.PubSub
WalletAPI
Mpool *chain.MessagePool
}
@ -25,17 +25,21 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types
}
func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) error {
msgb, err := smsg.Serialize()
if err != nil {
return err
}
if err := a.Mpool.Add(smsg); err != nil {
return err
return a.Mpool.Push(smsg)
}
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) error {
if msg.Nonce != 0 {
return xerrors.Errorf("MpoolPushMessage expects message nonce to be 0, was %d", msg.Nonce)
}
return a.PubSub.Publish("/fil/messages", msgb)
return a.Mpool.PushWithNonce(msg.From, func(nonce uint64) (*types.SignedMessage, error) {
msg.Nonce = nonce
return a.WalletSignMessage(ctx, msg.From, msg)
})
}
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
return a.Mpool.GetNonce(addr)
}