Merge pull request #9174 from filecoin-project/9171-add-uuid-to-message-sent-to-chain-node
feat: message: Add uuid to mpool message sent to chain node from miner
This commit is contained in:
commit
b5ac141936
@ -889,6 +889,11 @@ workflows:
|
||||
suite: itest-deals
|
||||
target: "./itests/deals_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-dup_mpool_messages
|
||||
suite: itest-dup_mpool_messages
|
||||
target: "./itests/dup_mpool_messages_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-gas_estimation
|
||||
suite: itest-gas_estimation
|
||||
|
12
api/types.go
12
api/types.go
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-graphsync"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
@ -53,7 +54,8 @@ type PubsubScore struct {
|
||||
}
|
||||
|
||||
type MessageSendSpec struct {
|
||||
MaxFee abi.TokenAmount
|
||||
MaxFee abi.TokenAmount
|
||||
MsgUuid uuid.UUID
|
||||
}
|
||||
|
||||
// GraphSyncDataTransfer provides diagnostics on a data transfer happening over graphsync
|
||||
@ -252,10 +254,10 @@ type RestrievalRes struct {
|
||||
}
|
||||
|
||||
// Selector specifies ipld selector string
|
||||
// - if the string starts with '{', it's interpreted as json selector string
|
||||
// see https://ipld.io/specs/selectors/ and https://ipld.io/specs/selectors/fixtures/selector-fixtures-1/
|
||||
// - otherwise the string is interpreted as ipld-selector-text-lite (simple ipld path)
|
||||
// see https://github.com/ipld/go-ipld-selector-text-lite
|
||||
// - if the string starts with '{', it's interpreted as json selector string
|
||||
// see https://ipld.io/specs/selectors/ and https://ipld.io/specs/selectors/fixtures/selector-fixtures-1/
|
||||
// - otherwise the string is interpreted as ipld-selector-text-lite (simple ipld path)
|
||||
// see https://github.com/ipld/go-ipld-selector-text-lite
|
||||
type Selector string
|
||||
|
||||
type DagSpec struct {
|
||||
|
@ -7,6 +7,8 @@ import (
|
||||
type FullNode = api.FullNode
|
||||
type FullNodeStruct = api.FullNodeStruct
|
||||
|
||||
type RawFullNodeAPI FullNode
|
||||
|
||||
func PermissionedFullAPI(a FullNode) FullNode {
|
||||
return api.PermissionedFullAPI(a)
|
||||
}
|
||||
|
Binary file not shown.
Binary file not shown.
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
@ -19,6 +20,7 @@ import (
|
||||
)
|
||||
|
||||
const dsKeyActorNonce = "ActorNextNonce"
|
||||
const dsKeyMsgUUIDSet = "MsgUuidSet"
|
||||
|
||||
var log = logging.Logger("messagesigner")
|
||||
|
||||
@ -91,6 +93,26 @@ func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, cb
|
||||
return smsg, nil
|
||||
}
|
||||
|
||||
func (ms *MessageSigner) GetSignedMessage(ctx context.Context, uuid uuid.UUID) (*types.SignedMessage, error) {
|
||||
|
||||
key := datastore.KeyWithNamespaces([]string{dsKeyMsgUUIDSet, uuid.String()})
|
||||
bytes, err := ms.ds.Get(ctx, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return types.DecodeSignedMessage(bytes)
|
||||
}
|
||||
|
||||
func (ms *MessageSigner) StoreSignedMessage(ctx context.Context, uuid uuid.UUID, message *types.SignedMessage) error {
|
||||
|
||||
key := datastore.KeyWithNamespaces([]string{dsKeyMsgUUIDSet, uuid.String()})
|
||||
serializedMsg, err := message.Serialize()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return ms.ds.Put(ctx, key, serializedMsg)
|
||||
}
|
||||
|
||||
// nextNonce gets the next nonce for the given address.
|
||||
// If there is no nonce in the datastore, gets the nonce from the message pool.
|
||||
func (ms *MessageSigner) nextNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
||||
|
@ -150,7 +150,7 @@ var runCmd = &cli.Command{
|
||||
node.Override(new(dtypes.APIEndpoint), func() (dtypes.APIEndpoint, error) {
|
||||
return multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/" + cctx.String("miner-api"))
|
||||
})),
|
||||
node.Override(new(v1api.FullNode), nodeApi),
|
||||
node.Override(new(v1api.RawFullNodeAPI), nodeApi),
|
||||
)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("creating node: %w", err)
|
||||
|
@ -2231,7 +2231,8 @@ Inputs:
|
||||
}
|
||||
},
|
||||
{
|
||||
"MaxFee": "0"
|
||||
"MaxFee": "0",
|
||||
"MsgUuid": "07070707-0707-0707-0707-070707070707"
|
||||
},
|
||||
[
|
||||
{
|
||||
@ -2705,7 +2706,8 @@ Inputs:
|
||||
}
|
||||
],
|
||||
{
|
||||
"MaxFee": "0"
|
||||
"MaxFee": "0",
|
||||
"MsgUuid": "07070707-0707-0707-0707-070707070707"
|
||||
}
|
||||
]
|
||||
```
|
||||
@ -2963,7 +2965,8 @@ Inputs:
|
||||
}
|
||||
},
|
||||
{
|
||||
"MaxFee": "0"
|
||||
"MaxFee": "0",
|
||||
"MsgUuid": "07070707-0707-0707-0707-070707070707"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
@ -2243,7 +2243,8 @@ Inputs:
|
||||
}
|
||||
},
|
||||
{
|
||||
"MaxFee": "0"
|
||||
"MaxFee": "0",
|
||||
"MsgUuid": "07070707-0707-0707-0707-070707070707"
|
||||
},
|
||||
[
|
||||
{
|
||||
@ -2717,7 +2718,8 @@ Inputs:
|
||||
}
|
||||
],
|
||||
{
|
||||
"MaxFee": "0"
|
||||
"MaxFee": "0",
|
||||
"MsgUuid": "07070707-0707-0707-0707-070707070707"
|
||||
}
|
||||
]
|
||||
```
|
||||
@ -3106,7 +3108,8 @@ Inputs:
|
||||
}
|
||||
},
|
||||
{
|
||||
"MaxFee": "0"
|
||||
"MaxFee": "0",
|
||||
"MsgUuid": "07070707-0707-0707-0707-070707070707"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
60
itests/dup_mpool_messages_test.go
Normal file
60
itests/dup_mpool_messages_test.go
Normal file
@ -0,0 +1,60 @@
|
||||
package itests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
uuid2 "github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
)
|
||||
|
||||
func TestDuplicateMpoolMessages(t *testing.T) {
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
blockTime := 50 * time.Millisecond
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs())
|
||||
ens.InterconnectAll().BeginMining(blockTime)
|
||||
|
||||
msgBal := &types.Message{
|
||||
From: client.DefaultKey.Address,
|
||||
To: builtin.BurntFundsActorAddr,
|
||||
Value: big.NewInt(10000),
|
||||
}
|
||||
|
||||
uuid := uuid2.New()
|
||||
msgSpec := &api.MessageSendSpec{MsgUuid: uuid}
|
||||
|
||||
msg, err := client.MpoolPushMessage(ctx, msgBal, msgSpec)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = client.StateWaitMsg(ctx, msg.Cid(), 3, api.LookbackNoLimit, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
remBal, err := client.WalletBalance(ctx, client.DefaultKey.Address)
|
||||
require.NoError(t, err)
|
||||
|
||||
msg2, err := client.MpoolPushMessage(ctx, msgBal, msgSpec)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, msg, msg2)
|
||||
|
||||
_, err = client.StateWaitMsg(ctx, msg2.Cid(), 3, api.LookbackNoLimit, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
currBal, err := client.WalletBalance(ctx, client.DefaultKey.Address)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, remBal, currBal)
|
||||
}
|
@ -628,8 +628,7 @@ func (n *Ensemble) Start() *Ensemble {
|
||||
node.Test(),
|
||||
|
||||
node.If(m.options.disableLibp2p, node.MockHost(n.mn)),
|
||||
|
||||
node.Override(new(v1api.FullNode), m.FullNode.FullNode),
|
||||
node.Override(new(v1api.RawFullNodeAPI), m.FullNode.FullNode),
|
||||
node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, m.ActorAddr)),
|
||||
|
||||
// disable resource filtering so that local worker gets assigned tasks
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
provider "github.com/filecoin-project/index-provider"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/api/v1api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/gen"
|
||||
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
|
||||
@ -77,6 +78,7 @@ func ConfigStorageMiner(c interface{}) Option {
|
||||
|
||||
return Options(
|
||||
|
||||
Override(new(v1api.FullNode), modules.MakeUuidWrapper),
|
||||
// Needed to instantiate pubsub used by index provider via ConfigCommon
|
||||
Override(new(dtypes.DrandSchedule), modules.BuiltinDrandConfig),
|
||||
Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap),
|
||||
|
@ -141,6 +141,16 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe
|
||||
cp := *msg
|
||||
msg = &cp
|
||||
inMsg := *msg
|
||||
|
||||
// Check if this uuid has already been processed
|
||||
if spec != nil {
|
||||
signedMessage, err := a.MessageSigner.GetSignedMessage(ctx, spec.MsgUuid)
|
||||
if err == nil {
|
||||
log.Warnf("Message already processed. cid=%s", signedMessage.Cid())
|
||||
return signedMessage, nil
|
||||
}
|
||||
}
|
||||
|
||||
fromA, err := a.Stmgr.ResolveToKeyAddress(ctx, msg.From, nil)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting key address: %w", err)
|
||||
@ -185,12 +195,25 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe
|
||||
}
|
||||
|
||||
// Sign and push the message
|
||||
return a.MessageSigner.SignMessage(ctx, msg, func(smsg *types.SignedMessage) error {
|
||||
signedMsg, err := a.MessageSigner.SignMessage(ctx, msg, func(smsg *types.SignedMessage) error {
|
||||
if _, err := a.MpoolModuleAPI.MpoolPush(ctx, smsg); err != nil {
|
||||
return xerrors.Errorf("mpool push: failed to push message: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Store uuid->signed message in datastore
|
||||
if spec != nil {
|
||||
err = a.MessageSigner.StoreSignedMessage(ctx, spec.MsgUuid, signedMsg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return signedMsg, nil
|
||||
}
|
||||
|
||||
func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
@ -79,6 +80,22 @@ var (
|
||||
StagingAreaDirName = "deal-staging"
|
||||
)
|
||||
|
||||
type UuidWrapper struct {
|
||||
v1api.FullNode
|
||||
}
|
||||
|
||||
func (a *UuidWrapper) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
|
||||
if spec == nil {
|
||||
spec = new(api.MessageSendSpec)
|
||||
}
|
||||
spec.MsgUuid = uuid.New()
|
||||
return a.FullNode.MpoolPushMessage(ctx, msg, spec)
|
||||
}
|
||||
|
||||
func MakeUuidWrapper(a v1api.RawFullNodeAPI) v1api.FullNode {
|
||||
return &UuidWrapper{a}
|
||||
}
|
||||
|
||||
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
|
||||
maddrb, err := ds.Get(context.TODO(), datastore.NewKey("miner-address"))
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user