diff --git a/api/api.go b/api/api.go index 153843535..70f72814d 100644 --- a/api/api.go +++ b/api/api.go @@ -58,6 +58,7 @@ type FullNode interface { MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error) MpoolPush(context.Context, *types.SignedMessage) error + MpoolPushMessage(context.Context, *types.Message) error // get nonce, sign, push MpoolGetNonce(context.Context, address.Address) (uint64, error) // FullNodeStruct diff --git a/api/struct.go b/api/struct.go index 97f7ba9fb..005c70965 100644 --- a/api/struct.go +++ b/api/struct.go @@ -50,6 +50,7 @@ type FullNodeStruct struct { MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"` MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"` + MpoolPushMessage func(context.Context, *types.Message) error`perm:"sign"` MinerRegister func(context.Context, address.Address) error `perm:"admin"` MinerUnregister func(context.Context, address.Address) error `perm:"admin"` @@ -192,6 +193,10 @@ func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessag return c.Internal.MpoolPush(ctx, smsg) } +func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message) error { + return c.Internal.MpoolPushMessage(ctx, msg) +} + func (c *FullNodeStruct) MinerRegister(ctx context.Context, addr address.Address) error { return c.Internal.MinerRegister(ctx, addr) } diff --git a/chain/messagepool.go b/chain/messagepool.go index 785f1da6a..6eb94cab3 100644 --- a/chain/messagepool.go +++ b/chain/messagepool.go @@ -2,6 +2,7 @@ package chain import ( "encoding/base64" + pubsub "github.com/libp2p/go-libp2p-pubsub" "sync" "github.com/filecoin-project/go-lotus/chain/address" @@ -16,6 +17,8 @@ type MessagePool struct { pending map[address.Address]*msgSet sm *stmgr.StateManager + + ps *pubsub.PubSub } type msgSet struct { @@ -36,20 +39,38 @@ func (ms *msgSet) add(m *types.SignedMessage) { ms.msgs[m.Message.Nonce] = m } -func NewMessagePool(sm *stmgr.StateManager) *MessagePool { +func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool { mp := &MessagePool{ pending: make(map[address.Address]*msgSet), sm: sm, + ps: ps, } sm.ChainStore().SubscribeHeadChanges(mp.HeadChange) return mp } +func (mp *MessagePool) Push(m *types.SignedMessage) error { + msgb, err := m.Serialize() + if err != nil { + return err + } + + if err := mp.Add(m); err != nil { + return err + } + + return mp.ps.Publish("/fil/messages", msgb) +} + func (mp *MessagePool) Add(m *types.SignedMessage) error { mp.lk.Lock() defer mp.lk.Unlock() + return mp.addLocked(m) +} + +func (mp *MessagePool) addLocked(m *types.SignedMessage) error { data, err := m.Message.Serialize() if err != nil { return err @@ -79,6 +100,10 @@ func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) { mp.lk.Lock() defer mp.lk.Unlock() + return mp.getNonceLocked(addr) +} + +func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) { mset, ok := mp.pending[addr] if ok { return mset.startNonce + uint64(len(mset.msgs)), nil @@ -92,6 +117,32 @@ func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) { return act.Nonce, nil } +func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) error { + mp.lk.Lock() + defer mp.lk.Unlock() + + nonce, err := mp.getNonceLocked(addr) + if err != nil { + return err + } + + msg, err := cb(nonce) + if err != nil { + return err + } + + msgb, err := msg.Serialize() + if err != nil { + return err + } + + if err := mp.addLocked(msg); err != nil { + return err + } + + return mp.ps.Publish("/fil/messages", msgb) +} + func (mp *MessagePool) Remove(from address.Address, nonce uint64) { mp.lk.Lock() defer mp.lk.Unlock() diff --git a/lotuspond/front/src/chain/send.js b/lotuspond/front/src/chain/send.js index 2a3d0e4f6..d09333dd6 100644 --- a/lotuspond/front/src/chain/send.js +++ b/lotuspond/front/src/chain/send.js @@ -19,8 +19,6 @@ async function pushMessage(client, from, inmsg) { inmsg.Method = 0 } - inmsg.Nonce = await client.call('Filecoin.MpoolGetNonce', [from]) - /* const msg = [ inmsg.To, inmsg.From, @@ -36,11 +34,9 @@ async function pushMessage(client, from, inmsg) { Buffer.from(inmsg.Params, 'base64'), ]*/ - const signed = await client.call('Filecoin.WalletSignMessage', [from, inmsg]) + console.log(inmsg) - console.log(signed) - - await client.call('Filecoin.MpoolPush', [signed]) + await client.call('Filecoin.MpoolPushMessage', [inmsg]) } export default pushMessage \ No newline at end of file diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 5afc41d9b..6a5daa2f7 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -3,18 +3,18 @@ package full import ( "context" "go.uber.org/fx" + "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain/address" "github.com/filecoin-project/go-lotus/chain/types" - - pubsub "github.com/libp2p/go-libp2p-pubsub" ) type MpoolAPI struct { fx.In - PubSub *pubsub.PubSub + WalletAPI + Mpool *chain.MessagePool } @@ -25,17 +25,21 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types } func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) error { - msgb, err := smsg.Serialize() - if err != nil { - return err - } - if err := a.Mpool.Add(smsg); err != nil { - return err + return a.Mpool.Push(smsg) +} + +func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) error { + if msg.Nonce != 0 { + return xerrors.Errorf("MpoolPushMessage expects message nonce to be 0, was %d", msg.Nonce) } - return a.PubSub.Publish("/fil/messages", msgb) + return a.Mpool.PushWithNonce(msg.From, func(nonce uint64) (*types.SignedMessage, error) { + msg.Nonce = nonce + return a.WalletSignMessage(ctx, msg.From, msg) + }) } + func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) { return a.Mpool.GetNonce(addr) }