Address more comments and add test for gorpc auth

This commit is contained in:
Shrenuj Bansal 2022-10-18 15:53:42 -04:00
parent 15ed1ee33c
commit ad8b959880
6 changed files with 126 additions and 38 deletions

View File

@ -18,7 +18,7 @@ import (
type MessageSignerConsensus struct {
MsgSigner
consensus *consensus.Consensus
Consensus *consensus.Consensus
}
func NewMessageSignerConsensus(
@ -34,16 +34,16 @@ func NewMessageSignerConsensus(
mpool: mpool,
ds: ds,
},
consensus: consensus,
Consensus: consensus,
}
}
func (ms *MessageSignerConsensus) IsLeader(ctx context.Context) bool {
return ms.consensus.IsLeader(ctx)
return ms.Consensus.IsLeader(ctx)
}
func (ms *MessageSignerConsensus) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) {
ok, err := ms.consensus.RedirectToLeader(method, arg, ret.(*types.SignedMessage))
ok, err := ms.Consensus.RedirectToLeader(method, arg, ret.(*types.SignedMessage))
if err != nil {
return ok, err
}
@ -61,19 +61,13 @@ func (ms *MessageSignerConsensus) SignMessage(
return nil, err
}
// We can't have an empty/default uuid as part of the consensus state so generate a new uuid if spec is empty
u := uuid.New()
if spec != nil {
u = spec.MsgUuid
}
op := &consensus.ConsensusOp{
Nonce: signedMsg.Message.Nonce,
Uuid: u,
Uuid: spec.MsgUuid,
Addr: signedMsg.Message.From,
SignedMsg: signedMsg,
}
err = ms.consensus.Commit(ctx, op)
err = ms.Consensus.Commit(ctx, op)
if err != nil {
return nil, err
}
@ -82,12 +76,12 @@ func (ms *MessageSignerConsensus) SignMessage(
}
func (ms *MessageSignerConsensus) GetSignedMessage(ctx context.Context, uuid uuid.UUID) (*types.SignedMessage, error) {
cstate, err := ms.consensus.State(ctx)
cstate, err := ms.Consensus.State(ctx)
if err != nil {
return nil, err
}
//cstate := state.(consensus.RaftState)
//cstate := state.(Consensus.RaftState)
msg, ok := cstate.MsgUuids[uuid]
if !ok {
return nil, xerrors.Errorf("Msg with Uuid %s not available", uuid)
@ -96,9 +90,9 @@ func (ms *MessageSignerConsensus) GetSignedMessage(ctx context.Context, uuid uui
}
func (ms *MessageSignerConsensus) GetRaftState(ctx context.Context) (*consensus.RaftState, error) {
return ms.consensus.State(ctx)
return ms.Consensus.State(ctx)
}
func (ms *MessageSignerConsensus) Leader(ctx context.Context) (peer.ID, error) {
return ms.consensus.Leader(ctx)
return ms.Consensus.Leader(ctx)
}

View File

@ -49,7 +49,7 @@ func GetAPIInfoMulti(ctx *cli.Context, t repo.RepoType) ([]APIInfo, error) {
strma := ctx.String(f)
strma = strings.TrimSpace(strma)
return []APIInfo{APIInfo{Addr: strma}}, nil
return []APIInfo{{Addr: strma}}, nil
}
//
@ -106,7 +106,7 @@ func GetAPIInfoMulti(ctx *cli.Context, t repo.RepoType) ([]APIInfo, error) {
log.Warnf("Couldn't load CLI token, capabilities may be limited: %v", err)
}
return []APIInfo{APIInfo{
return []APIInfo{{
Addr: ma.String(),
Token: token,
}}, nil
@ -225,9 +225,11 @@ func GetFullNodeAPI(ctx *cli.Context) (v0api.FullNode, jsonrpc.ClientCloser, err
return client.NewFullNodeRPCV0(ctx.Context, addr, headers)
}
type contextKey string
// Not thread safe
func OnSingleNode(ctx context.Context) context.Context {
return context.WithValue(ctx, "retry-node", new(*int))
return context.WithValue(ctx, contextKey("retry-node"), new(*int))
}
func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
@ -262,11 +264,12 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
// for calls that need to be performed on the same node
// primarily for miner when calling create block and submit block subsequently
if ctx.Value("retry-node") != nil {
if (*ctx.Value("retry-node").(**int)) == nil {
*ctx.Value("retry-node").(**int) = &curr
key := contextKey("retry-node")
if ctx.Value(key) != nil {
if (*ctx.Value(key).(**int)) == nil {
*ctx.Value(key).(**int) = &curr
} else {
curr = **ctx.Value("retry-node").(**int) - 1
curr = **ctx.Value(key).(**int) - 1
}
}

View File

@ -40,11 +40,10 @@ func ParseApiInfo(s string) APIInfo {
func ParseApiInfoMulti(s string) []APIInfo {
var apiInfos []APIInfo
if infoWithToken.Match([]byte(s)) {
allAddrs := strings.SplitN(s, ",", -1)
allAddrs := strings.SplitN(s, ",", -1)
for _, addr := range allAddrs {
for _, addr := range allAddrs {
if infoWithToken.Match([]byte(addr)) {
sp := strings.SplitN(addr, ":", 2)
apiInfos = append(apiInfos, APIInfo{
Addr: sp[1],

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/rand"
"fmt"
"github.com/filecoin-project/lotus/node/impl"
"reflect"
"testing"
"time"
@ -486,3 +487,89 @@ func TestChainStoreSync(t *testing.T) {
}
}
}
func TestGoRPCAuth(t *testing.T) {
kit.QuietMiningLogs()
ctx := context.Background()
var (
node0 kit.TestFullNode
node1 kit.TestFullNode
node2 kit.TestFullNode
node3 kit.TestFullNode
miner kit.TestMiner
)
pkey0, _ := generatePrivKey()
pkey1, _ := generatePrivKey()
pkey2, _ := generatePrivKey()
pkeys := []*kit.Libp2p{pkey0, pkey1, pkey2}
initPeerSet := []string{}
for _, pkey := range pkeys {
initPeerSet = append(initPeerSet, "/p2p/"+pkey.PeerID.String())
}
raftOps := kit.ConstructorOpts(
node.Override(new(*gorpc.Client), modules.NewRPCClient),
node.Override(new(*consensus.ClusterRaftConfig), func() *consensus.ClusterRaftConfig {
cfg := consensus.DefaultClusterRaftConfig()
cfg.InitPeerset = initPeerSet
return cfg
}),
node.Override(new(*consensus.Consensus), consensus.NewConsensusWithRPCClient(false)),
node.Override(new(*messagesigner.MessageSignerConsensus), messagesigner.NewMessageSignerConsensus),
node.Override(new(messagesigner.MsgSigner), func(ms *messagesigner.MessageSignerConsensus) *messagesigner.MessageSignerConsensus { return ms }),
node.Override(new(*modules.RPCHandler), modules.NewRPCHandler),
node.Override(node.GoRPCServer, modules.NewRPCServer),
)
//raftOps := kit.ConstructorOpts()
kit.ThroughRPC()
ens := kit.NewEnsemble(t).FullNode(&node0, raftOps, kit.ThroughRPC()).FullNode(&node1, raftOps, kit.ThroughRPC()).FullNode(&node2, raftOps, kit.ThroughRPC()).FullNode(&node3, raftOps)
node0.AssignPrivKey(pkey0)
node1.AssignPrivKey(pkey1)
node2.AssignPrivKey(pkey2)
nodes := []*kit.TestFullNode{&node0, &node1, &node2}
wrappedFullNode := kit.MergeFullNodes(nodes)
ens.MinerEnroll(&miner, wrappedFullNode, kit.WithAllSubsystems(), kit.ThroughRPC())
ens.Start()
// Import miner wallet to all nodes
addr0, err := node0.WalletImport(ctx, &miner.OwnerKey.KeyInfo)
require.NoError(t, err)
addr1, err := node1.WalletImport(ctx, &miner.OwnerKey.KeyInfo)
require.NoError(t, err)
addr2, err := node2.WalletImport(ctx, &miner.OwnerKey.KeyInfo)
require.NoError(t, err)
fmt.Println(addr0, addr1, addr2)
ens.InterconnectAll()
ens.AddInactiveMiner(&miner)
ens.Start()
ens.InterconnectAll().BeginMining(blockTime)
leader, err := node0.RaftLeader(ctx)
require.NoError(t, err)
client := node3.FullNode.(*impl.FullNodeAPI).RaftAPI.MessageSigner.Consensus.RpcClient
method := "MpoolPushMessage"
msg := &types.Message{
From: miner.OwnerKey.Address,
To: node0.DefaultKey.Address,
Value: big.NewInt(100000),
}
msgWhole := &api.MpoolMessageWhole{Msg: msg}
var ret types.SignedMessage
err = client.CallContext(ctx, leader, "Consensus", method, msgWhole, &ret)
require.True(t, gorpc.IsAuthorizationError(err))
}

View File

@ -115,7 +115,7 @@ type Consensus struct {
raft *raftWrapper
state *RaftState
rpcClient *rpc.Client
RpcClient *rpc.Client
rpcReady chan struct{}
readyCh chan struct{}
@ -197,7 +197,7 @@ func NewConsensusWithRPCClient(staging bool) func(host host.Host,
if err != nil {
return nil, err
}
cc.rpcClient = rpcClient
cc.RpcClient = rpcClient
cc.rpcReady <- struct{}{}
return cc, nil
}
@ -367,7 +367,7 @@ func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interf
}
logger.Debugf("redirecting %s to leader: %s", method, leader.Pretty())
finalErr = cc.rpcClient.CallContext(
finalErr = cc.RpcClient.CallContext(
ctx,
leader,
"Consensus",
@ -488,7 +488,7 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error {
finalErr = cc.raft.RemovePeer(ctx, peer.Encode(pid))
//cc.shutdownLock.RUnlock()
if finalErr != nil {
time.Sleep(time.Duration(cc.config.CommitRetryDelay))
time.Sleep(cc.config.CommitRetryDelay)
continue
}
logger.Infof("peer removed from Raft: %s", pid.Pretty())

View File

@ -159,8 +159,15 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe
}
}
// Check if this uuid has already been processed. Ignore if uuid is not populated
if (spec != nil) && (spec.MsgUuid != uuid.UUID{}) {
// Generate spec and uuid if not available in the message
if spec == nil {
spec = &api.MessageSendSpec{
MsgUuid: uuid.New(),
}
} else if (spec.MsgUuid == uuid.UUID{}) {
spec.MsgUuid = uuid.New()
} else {
// Check if this uuid has already been processed. Ignore if uuid is not populated
signedMessage, err := a.MessageSigner.GetSignedMessage(ctx, spec.MsgUuid)
if err == nil {
log.Warnf("Message already processed. cid=%s", signedMessage.Cid())
@ -223,11 +230,9 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe
}
// Store uuid->signed message in datastore
if (spec != nil) && (spec.MsgUuid != uuid.UUID{}) {
err = a.MessageSigner.StoreSignedMessage(ctx, spec.MsgUuid, signedMsg)
if err != nil {
return nil, err
}
err = a.MessageSigner.StoreSignedMessage(ctx, spec.MsgUuid, signedMsg)
if err != nil {
return nil, err
}
return signedMsg, nil