Use atomic moppl API in more places
This commit is contained in:
parent
567ae92ff2
commit
5ea1459275
@ -58,7 +58,7 @@ type FullNode interface {
|
|||||||
|
|
||||||
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
|
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
|
||||||
MpoolPush(context.Context, *types.SignedMessage) error
|
MpoolPush(context.Context, *types.SignedMessage) error
|
||||||
MpoolPushMessage(context.Context, *types.Message) error // get nonce, sign, push
|
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) // get nonce, sign, push
|
||||||
MpoolGetNonce(context.Context, address.Address) (uint64, error)
|
MpoolGetNonce(context.Context, address.Address) (uint64, error)
|
||||||
|
|
||||||
// FullNodeStruct
|
// FullNodeStruct
|
||||||
|
@ -50,7 +50,7 @@ type FullNodeStruct struct {
|
|||||||
|
|
||||||
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
|
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
|
||||||
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
|
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
|
||||||
MpoolPushMessage func(context.Context, *types.Message) error `perm:"sign"`
|
MpoolPushMessage func(context.Context, *types.Message) (*types.SignedMessage, error) `perm:"sign"`
|
||||||
|
|
||||||
MinerRegister func(context.Context, address.Address) error `perm:"admin"`
|
MinerRegister func(context.Context, address.Address) error `perm:"admin"`
|
||||||
MinerUnregister func(context.Context, address.Address) error `perm:"admin"`
|
MinerUnregister func(context.Context, address.Address) error `perm:"admin"`
|
||||||
@ -194,7 +194,7 @@ func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessag
|
|||||||
return c.Internal.MpoolPush(ctx, smsg)
|
return c.Internal.MpoolPush(ctx, smsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message) error {
|
func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
|
||||||
return c.Internal.MpoolPushMessage(ctx, msg)
|
return c.Internal.MpoolPushMessage(ctx, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,30 +117,30 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
|
|||||||
return act.Nonce, nil
|
return act.Nonce, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) error {
|
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
|
||||||
mp.lk.Lock()
|
mp.lk.Lock()
|
||||||
defer mp.lk.Unlock()
|
defer mp.lk.Unlock()
|
||||||
|
|
||||||
nonce, err := mp.getNonceLocked(addr)
|
nonce, err := mp.getNonceLocked(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
msg, err := cb(nonce)
|
msg, err := cb(nonce)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
msgb, err := msg.Serialize()
|
msgb, err := msg.Serialize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := mp.addLocked(msg); err != nil {
|
if err := mp.addLocked(msg); err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return mp.ps.Publish("/fil/messages", msgb)
|
return msg, mp.ps.Publish("/fil/messages", msgb)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
|
func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
|
||||||
|
@ -2,7 +2,6 @@ package cli
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/filecoin-project/go-lotus/api"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
25
cli/send.go
25
cli/send.go
@ -2,9 +2,8 @@ package cli
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-lotus/chain/address"
|
"github.com/filecoin-project/go-lotus/chain/address"
|
||||||
types "github.com/filecoin-project/go-lotus/chain/types"
|
"github.com/filecoin-project/go-lotus/chain/types"
|
||||||
"gopkg.in/urfave/cli.v2"
|
"gopkg.in/urfave/cli.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -56,39 +55,19 @@ var sendCmd = &cli.Command{
|
|||||||
fromAddr = addr
|
fromAddr = addr
|
||||||
}
|
}
|
||||||
|
|
||||||
nonce, err := api.MpoolGetNonce(ctx, fromAddr)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := &types.Message{
|
msg := &types.Message{
|
||||||
From: fromAddr,
|
From: fromAddr,
|
||||||
To: toAddr,
|
To: toAddr,
|
||||||
Value: val,
|
Value: val,
|
||||||
Nonce: nonce,
|
|
||||||
GasLimit: types.NewInt(1000),
|
GasLimit: types.NewInt(1000),
|
||||||
GasPrice: types.NewInt(0),
|
GasPrice: types.NewInt(0),
|
||||||
}
|
}
|
||||||
|
|
||||||
sermsg, err := msg.Serialize()
|
_, err = api.MpoolPushMessage(ctx, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
sig, err := api.WalletSign(ctx, fromAddr, sermsg)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
smsg := &types.SignedMessage{
|
|
||||||
Message: *msg,
|
|
||||||
Signature: *sig,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := api.MpoolPush(ctx, smsg); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -166,31 +166,21 @@ func configureStorageMiner(ctx context.Context, api api.FullNode, addr address.A
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
nonce, err := api.MpoolGetNonce(ctx, waddr)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := &types.Message{
|
msg := &types.Message{
|
||||||
To: addr,
|
To: addr,
|
||||||
From: waddr,
|
From: waddr,
|
||||||
Method: actors.MAMethods.UpdatePeerID,
|
Method: actors.MAMethods.UpdatePeerID,
|
||||||
Params: enc,
|
Params: enc,
|
||||||
Nonce: nonce,
|
|
||||||
Value: types.NewInt(0),
|
Value: types.NewInt(0),
|
||||||
GasPrice: types.NewInt(0),
|
GasPrice: types.NewInt(0),
|
||||||
GasLimit: types.NewInt(1000000),
|
GasLimit: types.NewInt(1000000),
|
||||||
}
|
}
|
||||||
|
|
||||||
smsg, err := api.WalletSignMessage(ctx, waddr, msg)
|
smsg, err := api.MpoolPushMessage(ctx, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := api.MpoolPush(ctx, smsg); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("Waiting for message: ", smsg.Cid())
|
log.Info("Waiting for message: ", smsg.Cid())
|
||||||
ret, err := api.ChainWaitMsg(ctx, smsg.Cid())
|
ret, err := api.ChainWaitMsg(ctx, smsg.Cid())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -212,11 +202,6 @@ func createStorageMiner(ctx context.Context, api api.FullNode, peerid peer.ID) (
|
|||||||
return address.Undef, err
|
return address.Undef, err
|
||||||
}
|
}
|
||||||
|
|
||||||
nonce, err := api.MpoolGetNonce(ctx, defOwner)
|
|
||||||
if err != nil {
|
|
||||||
return address.Undef, err
|
|
||||||
}
|
|
||||||
|
|
||||||
k, err := api.WalletNew(ctx, types.KTBLS)
|
k, err := api.WalletNew(ctx, types.KTBLS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return address.Undef, err
|
return address.Undef, err
|
||||||
@ -234,10 +219,9 @@ func createStorageMiner(ctx context.Context, api api.FullNode, peerid peer.ID) (
|
|||||||
return address.Undef, err
|
return address.Undef, err
|
||||||
}
|
}
|
||||||
|
|
||||||
createStorageMinerMsg := types.Message{
|
createStorageMinerMsg := &types.Message{
|
||||||
To: actors.StorageMarketAddress,
|
To: actors.StorageMarketAddress,
|
||||||
From: defOwner,
|
From: defOwner,
|
||||||
Nonce: nonce,
|
|
||||||
Value: collateral,
|
Value: collateral,
|
||||||
|
|
||||||
Method: actors.SMAMethods.CreateStorageMiner,
|
Method: actors.SMAMethods.CreateStorageMiner,
|
||||||
@ -247,30 +231,12 @@ func createStorageMiner(ctx context.Context, api api.FullNode, peerid peer.ID) (
|
|||||||
GasPrice: types.NewInt(0),
|
GasPrice: types.NewInt(0),
|
||||||
}
|
}
|
||||||
|
|
||||||
unsigned, err := createStorageMinerMsg.Serialize()
|
signed, err := api.MpoolPushMessage(ctx, createStorageMinerMsg)
|
||||||
if err != nil {
|
|
||||||
return address.Undef, err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("Signing StorageMarket.CreateStorageMiner")
|
|
||||||
|
|
||||||
sig, err := api.WalletSign(ctx, defOwner, unsigned)
|
|
||||||
if err != nil {
|
|
||||||
return address.Undef, err
|
|
||||||
}
|
|
||||||
|
|
||||||
signed := &types.SignedMessage{
|
|
||||||
Message: createStorageMinerMsg,
|
|
||||||
Signature: *sig,
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("Pushing %s to Mpool", signed.Cid())
|
|
||||||
|
|
||||||
err = api.MpoolPush(ctx, signed)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return address.Undef, err
|
return address.Undef, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Infof("Pushed StorageMarket.CreateStorageMiner, %s to Mpool", signed.Cid())
|
||||||
log.Infof("Waiting for confirmation")
|
log.Infof("Waiting for confirmation")
|
||||||
|
|
||||||
mw, err := api.ChainWaitMsg(ctx, signed.Cid())
|
mw, err := api.ChainWaitMsg(ctx, signed.Cid())
|
||||||
|
@ -28,9 +28,9 @@ func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) err
|
|||||||
return a.Mpool.Push(smsg)
|
return a.Mpool.Push(smsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) error {
|
func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
|
||||||
if msg.Nonce != 0 {
|
if msg.Nonce != 0 {
|
||||||
return xerrors.Errorf("MpoolPushMessage expects message nonce to be 0, was %d", msg.Nonce)
|
return nil, xerrors.Errorf("MpoolPushMessage expects message nonce to be 0, was %d", msg.Nonce)
|
||||||
}
|
}
|
||||||
|
|
||||||
return a.Mpool.PushWithNonce(msg.From, func(nonce uint64) (*types.SignedMessage, error) {
|
return a.Mpool.PushWithNonce(msg.From, func(nonce uint64) (*types.SignedMessage, error) {
|
||||||
|
@ -17,36 +17,29 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am
|
|||||||
return address.Undef, cid.Undef, aerr
|
return address.Undef, cid.Undef, aerr
|
||||||
}
|
}
|
||||||
|
|
||||||
nonce, err := pm.mpool.MpoolGetNonce(ctx, from)
|
enc, aerr := actors.SerializeParams(&actors.ExecParams{
|
||||||
if err != nil {
|
|
||||||
return address.Undef, cid.Undef, err
|
|
||||||
}
|
|
||||||
|
|
||||||
enc, err := actors.SerializeParams(&actors.ExecParams{
|
|
||||||
Params: params,
|
Params: params,
|
||||||
Code: actors.PaymentChannelActorCodeCid,
|
Code: actors.PaymentChannelActorCodeCid,
|
||||||
})
|
})
|
||||||
|
if aerr != nil {
|
||||||
|
return address.Undef, cid.Undef, aerr
|
||||||
|
}
|
||||||
|
|
||||||
msg := &types.Message{
|
msg := &types.Message{
|
||||||
To: actors.InitActorAddress,
|
To: actors.InitActorAddress,
|
||||||
From: from,
|
From: from,
|
||||||
Value: amt,
|
Value: amt,
|
||||||
Nonce: nonce,
|
|
||||||
Method: actors.IAMethods.Exec,
|
Method: actors.IAMethods.Exec,
|
||||||
Params: enc,
|
Params: enc,
|
||||||
GasLimit: types.NewInt(1000000),
|
GasLimit: types.NewInt(1000000),
|
||||||
GasPrice: types.NewInt(0),
|
GasPrice: types.NewInt(0),
|
||||||
}
|
}
|
||||||
|
|
||||||
smsg, err := pm.wallet.WalletSignMessage(ctx, from, msg)
|
smsg, err := pm.mpool.MpoolPushMessage(ctx, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return address.Undef, cid.Undef, err
|
return address.Undef, cid.Undef, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := pm.mpool.MpoolPush(ctx, smsg); err != nil {
|
|
||||||
return address.Undef, cid.Undef, err
|
|
||||||
}
|
|
||||||
|
|
||||||
mcid := smsg.Cid()
|
mcid := smsg.Cid()
|
||||||
|
|
||||||
// TODO: wait outside the store lock!
|
// TODO: wait outside the store lock!
|
||||||
@ -78,30 +71,20 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from address.Address, amt types.BigInt) error {
|
func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from address.Address, amt types.BigInt) error {
|
||||||
nonce, err := pm.mpool.MpoolGetNonce(ctx, from)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := &types.Message{
|
msg := &types.Message{
|
||||||
To: ch,
|
To: ch,
|
||||||
From: from,
|
From: from,
|
||||||
Value: amt,
|
Value: amt,
|
||||||
Nonce: nonce,
|
|
||||||
Method: 0,
|
Method: 0,
|
||||||
GasLimit: types.NewInt(1000000),
|
GasLimit: types.NewInt(1000000),
|
||||||
GasPrice: types.NewInt(0),
|
GasPrice: types.NewInt(0),
|
||||||
}
|
}
|
||||||
|
|
||||||
smsg, err := pm.wallet.WalletSignMessage(ctx, from, msg)
|
smsg, err := pm.mpool.MpoolPushMessage(ctx, msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := pm.mpool.MpoolPush(ctx, smsg); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
mwait, err := pm.chain.ChainWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock!
|
mwait, err := pm.chain.ChainWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock!
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
Loading…
Reference in New Issue
Block a user