Add uuid to mpool message sent to chain node from miner
This commit is contained in:
parent
f27ef344bc
commit
fa4a479b97
@ -5,8 +5,10 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-graphsync"
|
"github.com/ipfs/go-graphsync"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p-core/network"
|
"github.com/libp2p/go-libp2p-core/network"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
@ -54,6 +56,7 @@ type PubsubScore struct {
|
|||||||
|
|
||||||
type MessageSendSpec struct {
|
type MessageSendSpec struct {
|
||||||
MaxFee abi.TokenAmount
|
MaxFee abi.TokenAmount
|
||||||
|
MsgUuid uuid.UUID
|
||||||
}
|
}
|
||||||
|
|
||||||
// GraphSyncDataTransfer provides diagnostics on a data transfer happening over graphsync
|
// GraphSyncDataTransfer provides diagnostics on a data transfer happening over graphsync
|
||||||
|
@ -7,6 +7,8 @@ import (
|
|||||||
type FullNode = api.FullNode
|
type FullNode = api.FullNode
|
||||||
type FullNodeStruct = api.FullNodeStruct
|
type FullNodeStruct = api.FullNodeStruct
|
||||||
|
|
||||||
|
type RawFullNodeAPI FullNode
|
||||||
|
|
||||||
func PermissionedFullAPI(a FullNode) FullNode {
|
func PermissionedFullAPI(a FullNode) FullNode {
|
||||||
return api.PermissionedFullAPI(a)
|
return api.PermissionedFullAPI(a)
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package messagesigner
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/google/uuid"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
@ -19,6 +20,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const dsKeyActorNonce = "ActorNextNonce"
|
const dsKeyActorNonce = "ActorNextNonce"
|
||||||
|
const dsKeyMsgUuidSet = "MsgUuidSet"
|
||||||
|
|
||||||
var log = logging.Logger("messagesigner")
|
var log = logging.Logger("messagesigner")
|
||||||
|
|
||||||
@ -91,6 +93,26 @@ func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, cb
|
|||||||
return smsg, nil
|
return smsg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ms *MessageSigner) GetSignedMessage(ctx context.Context, uuid uuid.UUID) (*types.SignedMessage, error) {
|
||||||
|
|
||||||
|
key := datastore.KeyWithNamespaces([]string{dsKeyMsgUuidSet, uuid.String()})
|
||||||
|
bytes, err := ms.ds.Get(ctx, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return types.DecodeSignedMessage(bytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ms *MessageSigner) StoreSignedMessage(ctx context.Context, uuid uuid.UUID, message *types.SignedMessage) error {
|
||||||
|
|
||||||
|
key := datastore.KeyWithNamespaces([]string{dsKeyMsgUuidSet, uuid.String()})
|
||||||
|
serializedMsg, err := message.Serialize()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return ms.ds.Put(ctx, key, serializedMsg)
|
||||||
|
}
|
||||||
|
|
||||||
// nextNonce gets the next nonce for the given address.
|
// nextNonce gets the next nonce for the given address.
|
||||||
// If there is no nonce in the datastore, gets the nonce from the message pool.
|
// If there is no nonce in the datastore, gets the nonce from the message pool.
|
||||||
func (ms *MessageSigner) nextNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
func (ms *MessageSigner) nextNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
||||||
|
@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/filecoin-project/lotus/api/v1api"
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
@ -14,7 +15,6 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/api/v0api"
|
"github.com/filecoin-project/lotus/api/v0api"
|
||||||
"github.com/filecoin-project/lotus/api/v1api"
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
lcli "github.com/filecoin-project/lotus/cli"
|
lcli "github.com/filecoin-project/lotus/cli"
|
||||||
"github.com/filecoin-project/lotus/lib/ulimit"
|
"github.com/filecoin-project/lotus/lib/ulimit"
|
||||||
@ -150,7 +150,7 @@ var runCmd = &cli.Command{
|
|||||||
node.Override(new(dtypes.APIEndpoint), func() (dtypes.APIEndpoint, error) {
|
node.Override(new(dtypes.APIEndpoint), func() (dtypes.APIEndpoint, error) {
|
||||||
return multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/" + cctx.String("miner-api"))
|
return multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/" + cctx.String("miner-api"))
|
||||||
})),
|
})),
|
||||||
node.Override(new(v1api.FullNode), nodeApi),
|
node.Override(new(v1api.RawFullNodeAPI), nodeApi),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("creating node: %w", err)
|
return xerrors.Errorf("creating node: %w", err)
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/filecoin-project/lotus/api/v1api"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -32,7 +33,6 @@ import (
|
|||||||
power3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/power"
|
power3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/power"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/api/v1api"
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain"
|
"github.com/filecoin-project/lotus/chain"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
@ -628,8 +628,7 @@ func (n *Ensemble) Start() *Ensemble {
|
|||||||
node.Test(),
|
node.Test(),
|
||||||
|
|
||||||
node.If(m.options.disableLibp2p, node.MockHost(n.mn)),
|
node.If(m.options.disableLibp2p, node.MockHost(n.mn)),
|
||||||
|
node.Override(new(v1api.RawFullNodeAPI), m.FullNode.FullNode),
|
||||||
node.Override(new(v1api.FullNode), m.FullNode.FullNode),
|
|
||||||
node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, m.ActorAddr)),
|
node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, m.ActorAddr)),
|
||||||
|
|
||||||
// disable resource filtering so that local worker gets assigned tasks
|
// disable resource filtering so that local worker gets assigned tasks
|
||||||
|
@ -2,6 +2,7 @@ package node
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"github.com/filecoin-project/lotus/api/v1api"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
@ -77,6 +78,7 @@ func ConfigStorageMiner(c interface{}) Option {
|
|||||||
|
|
||||||
return Options(
|
return Options(
|
||||||
|
|
||||||
|
Override(new(v1api.FullNode), modules.GetUuidWrapper),
|
||||||
// Needed to instantiate pubsub used by index provider via ConfigCommon
|
// Needed to instantiate pubsub used by index provider via ConfigCommon
|
||||||
Override(new(dtypes.DrandSchedule), modules.BuiltinDrandConfig),
|
Override(new(dtypes.DrandSchedule), modules.BuiltinDrandConfig),
|
||||||
Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap),
|
Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap),
|
||||||
|
@ -3,12 +3,12 @@ package full
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
"github.com/filecoin-project/go-state-types/big"
|
"github.com/filecoin-project/go-state-types/big"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
@ -141,6 +141,16 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe
|
|||||||
cp := *msg
|
cp := *msg
|
||||||
msg = &cp
|
msg = &cp
|
||||||
inMsg := *msg
|
inMsg := *msg
|
||||||
|
|
||||||
|
// Check if this uuid has already been processed
|
||||||
|
if spec != nil {
|
||||||
|
signedMessage, err := a.MessageSigner.GetSignedMessage(ctx, spec.MsgUuid)
|
||||||
|
if err == nil {
|
||||||
|
log.Warnf("Message already processed. cid=%s", signedMessage.Cid())
|
||||||
|
return signedMessage, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fromA, err := a.Stmgr.ResolveToKeyAddress(ctx, msg.From, nil)
|
fromA, err := a.Stmgr.ResolveToKeyAddress(ctx, msg.From, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("getting key address: %w", err)
|
return nil, xerrors.Errorf("getting key address: %w", err)
|
||||||
@ -185,12 +195,25 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Sign and push the message
|
// Sign and push the message
|
||||||
return a.MessageSigner.SignMessage(ctx, msg, func(smsg *types.SignedMessage) error {
|
signedMsg, err := a.MessageSigner.SignMessage(ctx, msg, func(smsg *types.SignedMessage) error {
|
||||||
if _, err := a.MpoolModuleAPI.MpoolPush(ctx, smsg); err != nil {
|
if _, err := a.MpoolModuleAPI.MpoolPush(ctx, smsg); err != nil {
|
||||||
return xerrors.Errorf("mpool push: failed to push message: %w", err)
|
return xerrors.Errorf("mpool push: failed to push message: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store uuid->signed message in datastore
|
||||||
|
if spec != nil {
|
||||||
|
err = a.MessageSigner.StoreSignedMessage(ctx, spec.MsgUuid, signedMsg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return signedMsg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
|
func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/google/uuid"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@ -79,6 +80,19 @@ var (
|
|||||||
StagingAreaDirName = "deal-staging"
|
StagingAreaDirName = "deal-staging"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type UuidWrapper struct {
|
||||||
|
v1api.FullNode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *UuidWrapper) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) {
|
||||||
|
spec.MsgUuid = uuid.New()
|
||||||
|
return a.FullNode.MpoolPushMessage(ctx, msg, spec)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetUuidWrapper(a v1api.RawFullNodeAPI) v1api.FullNode {
|
||||||
|
return &UuidWrapper{a}
|
||||||
|
}
|
||||||
|
|
||||||
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
|
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
|
||||||
maddrb, err := ds.Get(context.TODO(), datastore.NewKey("miner-address"))
|
maddrb, err := ds.Get(context.TODO(), datastore.NewKey("miner-address"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user