diff --git a/chain/messagesigner/messagesigner_consensus.go b/chain/messagesigner/messagesigner_consensus.go index d0ef11166..905bb7199 100644 --- a/chain/messagesigner/messagesigner_consensus.go +++ b/chain/messagesigner/messagesigner_consensus.go @@ -18,7 +18,7 @@ import ( type MessageSignerConsensus struct { MsgSigner - consensus *consensus.Consensus + Consensus *consensus.Consensus } func NewMessageSignerConsensus( @@ -34,16 +34,16 @@ func NewMessageSignerConsensus( mpool: mpool, ds: ds, }, - consensus: consensus, + Consensus: consensus, } } func (ms *MessageSignerConsensus) IsLeader(ctx context.Context) bool { - return ms.consensus.IsLeader(ctx) + return ms.Consensus.IsLeader(ctx) } func (ms *MessageSignerConsensus) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) { - ok, err := ms.consensus.RedirectToLeader(method, arg, ret.(*types.SignedMessage)) + ok, err := ms.Consensus.RedirectToLeader(method, arg, ret.(*types.SignedMessage)) if err != nil { return ok, err } @@ -61,19 +61,13 @@ func (ms *MessageSignerConsensus) SignMessage( return nil, err } - // We can't have an empty/default uuid as part of the consensus state so generate a new uuid if spec is empty - u := uuid.New() - if spec != nil { - u = spec.MsgUuid - } - op := &consensus.ConsensusOp{ Nonce: signedMsg.Message.Nonce, - Uuid: u, + Uuid: spec.MsgUuid, Addr: signedMsg.Message.From, SignedMsg: signedMsg, } - err = ms.consensus.Commit(ctx, op) + err = ms.Consensus.Commit(ctx, op) if err != nil { return nil, err } @@ -82,12 +76,12 @@ func (ms *MessageSignerConsensus) SignMessage( } func (ms *MessageSignerConsensus) GetSignedMessage(ctx context.Context, uuid uuid.UUID) (*types.SignedMessage, error) { - cstate, err := ms.consensus.State(ctx) + cstate, err := ms.Consensus.State(ctx) if err != nil { return nil, err } - //cstate := state.(consensus.RaftState) + //cstate := state.(Consensus.RaftState) msg, ok := cstate.MsgUuids[uuid] if !ok { return nil, xerrors.Errorf("Msg with Uuid %s not available", uuid) @@ -96,9 +90,9 @@ func (ms *MessageSignerConsensus) GetSignedMessage(ctx context.Context, uuid uui } func (ms *MessageSignerConsensus) GetRaftState(ctx context.Context) (*consensus.RaftState, error) { - return ms.consensus.State(ctx) + return ms.Consensus.State(ctx) } func (ms *MessageSignerConsensus) Leader(ctx context.Context) (peer.ID, error) { - return ms.consensus.Leader(ctx) + return ms.Consensus.Leader(ctx) } diff --git a/cli/util/api.go b/cli/util/api.go index f2575809c..d3d0b4c6f 100644 --- a/cli/util/api.go +++ b/cli/util/api.go @@ -49,7 +49,7 @@ func GetAPIInfoMulti(ctx *cli.Context, t repo.RepoType) ([]APIInfo, error) { strma := ctx.String(f) strma = strings.TrimSpace(strma) - return []APIInfo{APIInfo{Addr: strma}}, nil + return []APIInfo{{Addr: strma}}, nil } // @@ -106,7 +106,7 @@ func GetAPIInfoMulti(ctx *cli.Context, t repo.RepoType) ([]APIInfo, error) { log.Warnf("Couldn't load CLI token, capabilities may be limited: %v", err) } - return []APIInfo{APIInfo{ + return []APIInfo{{ Addr: ma.String(), Token: token, }}, nil @@ -225,9 +225,11 @@ func GetFullNodeAPI(ctx *cli.Context) (v0api.FullNode, jsonrpc.ClientCloser, err return client.NewFullNodeRPCV0(ctx.Context, addr, headers) } +type contextKey string + // Not thread safe func OnSingleNode(ctx context.Context) context.Context { - return context.WithValue(ctx, "retry-node", new(*int)) + return context.WithValue(ctx, contextKey("retry-node"), new(*int)) } func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) { @@ -262,11 +264,12 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) { // for calls that need to be performed on the same node // primarily for miner when calling create block and submit block subsequently - if ctx.Value("retry-node") != nil { - if (*ctx.Value("retry-node").(**int)) == nil { - *ctx.Value("retry-node").(**int) = &curr + key := contextKey("retry-node") + if ctx.Value(key) != nil { + if (*ctx.Value(key).(**int)) == nil { + *ctx.Value(key).(**int) = &curr } else { - curr = **ctx.Value("retry-node").(**int) - 1 + curr = **ctx.Value(key).(**int) - 1 } } diff --git a/cli/util/apiinfo.go b/cli/util/apiinfo.go index b22e35933..68befdb55 100644 --- a/cli/util/apiinfo.go +++ b/cli/util/apiinfo.go @@ -40,11 +40,10 @@ func ParseApiInfo(s string) APIInfo { func ParseApiInfoMulti(s string) []APIInfo { var apiInfos []APIInfo - if infoWithToken.Match([]byte(s)) { + allAddrs := strings.SplitN(s, ",", -1) - allAddrs := strings.SplitN(s, ",", -1) - - for _, addr := range allAddrs { + for _, addr := range allAddrs { + if infoWithToken.Match([]byte(addr)) { sp := strings.SplitN(addr, ":", 2) apiInfos = append(apiInfos, APIInfo{ Addr: sp[1], diff --git a/itests/raft_messagesigner_test.go b/itests/raft_messagesigner_test.go index c1a0c3ab8..87f8f28ff 100644 --- a/itests/raft_messagesigner_test.go +++ b/itests/raft_messagesigner_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "fmt" + "github.com/filecoin-project/lotus/node/impl" "reflect" "testing" "time" @@ -486,3 +487,89 @@ func TestChainStoreSync(t *testing.T) { } } } + +func TestGoRPCAuth(t *testing.T) { + + kit.QuietMiningLogs() + ctx := context.Background() + + var ( + node0 kit.TestFullNode + node1 kit.TestFullNode + node2 kit.TestFullNode + node3 kit.TestFullNode + miner kit.TestMiner + ) + + pkey0, _ := generatePrivKey() + pkey1, _ := generatePrivKey() + pkey2, _ := generatePrivKey() + + pkeys := []*kit.Libp2p{pkey0, pkey1, pkey2} + initPeerSet := []string{} + for _, pkey := range pkeys { + initPeerSet = append(initPeerSet, "/p2p/"+pkey.PeerID.String()) + } + + raftOps := kit.ConstructorOpts( + node.Override(new(*gorpc.Client), modules.NewRPCClient), + node.Override(new(*consensus.ClusterRaftConfig), func() *consensus.ClusterRaftConfig { + cfg := consensus.DefaultClusterRaftConfig() + cfg.InitPeerset = initPeerSet + return cfg + }), + node.Override(new(*consensus.Consensus), consensus.NewConsensusWithRPCClient(false)), + node.Override(new(*messagesigner.MessageSignerConsensus), messagesigner.NewMessageSignerConsensus), + node.Override(new(messagesigner.MsgSigner), func(ms *messagesigner.MessageSignerConsensus) *messagesigner.MessageSignerConsensus { return ms }), + node.Override(new(*modules.RPCHandler), modules.NewRPCHandler), + node.Override(node.GoRPCServer, modules.NewRPCServer), + ) + //raftOps := kit.ConstructorOpts() + kit.ThroughRPC() + + ens := kit.NewEnsemble(t).FullNode(&node0, raftOps, kit.ThroughRPC()).FullNode(&node1, raftOps, kit.ThroughRPC()).FullNode(&node2, raftOps, kit.ThroughRPC()).FullNode(&node3, raftOps) + node0.AssignPrivKey(pkey0) + node1.AssignPrivKey(pkey1) + node2.AssignPrivKey(pkey2) + + nodes := []*kit.TestFullNode{&node0, &node1, &node2} + wrappedFullNode := kit.MergeFullNodes(nodes) + + ens.MinerEnroll(&miner, wrappedFullNode, kit.WithAllSubsystems(), kit.ThroughRPC()) + ens.Start() + + // Import miner wallet to all nodes + addr0, err := node0.WalletImport(ctx, &miner.OwnerKey.KeyInfo) + require.NoError(t, err) + addr1, err := node1.WalletImport(ctx, &miner.OwnerKey.KeyInfo) + require.NoError(t, err) + addr2, err := node2.WalletImport(ctx, &miner.OwnerKey.KeyInfo) + require.NoError(t, err) + + fmt.Println(addr0, addr1, addr2) + + ens.InterconnectAll() + + ens.AddInactiveMiner(&miner) + ens.Start() + + ens.InterconnectAll().BeginMining(blockTime) + + leader, err := node0.RaftLeader(ctx) + require.NoError(t, err) + + client := node3.FullNode.(*impl.FullNodeAPI).RaftAPI.MessageSigner.Consensus.RpcClient + method := "MpoolPushMessage" + + msg := &types.Message{ + From: miner.OwnerKey.Address, + To: node0.DefaultKey.Address, + Value: big.NewInt(100000), + } + msgWhole := &api.MpoolMessageWhole{Msg: msg} + var ret types.SignedMessage + + err = client.CallContext(ctx, leader, "Consensus", method, msgWhole, &ret) + require.True(t, gorpc.IsAuthorizationError(err)) + +} diff --git a/lib/consensus/raft/consensus.go b/lib/consensus/raft/consensus.go index 4fc935147..93536f584 100644 --- a/lib/consensus/raft/consensus.go +++ b/lib/consensus/raft/consensus.go @@ -115,7 +115,7 @@ type Consensus struct { raft *raftWrapper state *RaftState - rpcClient *rpc.Client + RpcClient *rpc.Client rpcReady chan struct{} readyCh chan struct{} @@ -197,7 +197,7 @@ func NewConsensusWithRPCClient(staging bool) func(host host.Host, if err != nil { return nil, err } - cc.rpcClient = rpcClient + cc.RpcClient = rpcClient cc.rpcReady <- struct{}{} return cc, nil } @@ -367,7 +367,7 @@ func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interf } logger.Debugf("redirecting %s to leader: %s", method, leader.Pretty()) - finalErr = cc.rpcClient.CallContext( + finalErr = cc.RpcClient.CallContext( ctx, leader, "Consensus", @@ -488,7 +488,7 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error { finalErr = cc.raft.RemovePeer(ctx, peer.Encode(pid)) //cc.shutdownLock.RUnlock() if finalErr != nil { - time.Sleep(time.Duration(cc.config.CommitRetryDelay)) + time.Sleep(cc.config.CommitRetryDelay) continue } logger.Infof("peer removed from Raft: %s", pid.Pretty()) diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 8a6236db0..31d134dac 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -159,8 +159,15 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe } } - // Check if this uuid has already been processed. Ignore if uuid is not populated - if (spec != nil) && (spec.MsgUuid != uuid.UUID{}) { + // Generate spec and uuid if not available in the message + if spec == nil { + spec = &api.MessageSendSpec{ + MsgUuid: uuid.New(), + } + } else if (spec.MsgUuid == uuid.UUID{}) { + spec.MsgUuid = uuid.New() + } else { + // Check if this uuid has already been processed. Ignore if uuid is not populated signedMessage, err := a.MessageSigner.GetSignedMessage(ctx, spec.MsgUuid) if err == nil { log.Warnf("Message already processed. cid=%s", signedMessage.Cid()) @@ -223,11 +230,9 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe } // Store uuid->signed message in datastore - if (spec != nil) && (spec.MsgUuid != uuid.UUID{}) { - err = a.MessageSigner.StoreSignedMessage(ctx, spec.MsgUuid, signedMsg) - if err != nil { - return nil, err - } + err = a.MessageSigner.StoreSignedMessage(ctx, spec.MsgUuid, signedMsg) + if err != nil { + return nil, err } return signedMsg, nil