Add Auth func for gorpc and address comments

This commit is contained in:
Shrenuj Bansal 2022-09-21 15:41:10 -04:00
parent 81c729e09c
commit 1fe4aa3467
6 changed files with 32 additions and 35 deletions

View File

@ -37,7 +37,7 @@ type MsgSigner interface {
StoreSignedMessage(ctx context.Context, uuid uuid.UUID, message *types.SignedMessage) error
NextNonce(ctx context.Context, addr address.Address) (uint64, error)
SaveNonce(ctx context.Context, addr address.Address, nonce uint64) error
DstoreKey(addr address.Address) datastore.Key
dstoreKey(addr address.Address) datastore.Key
IsLeader(ctx context.Context) bool
RaftLeader(ctx context.Context) (peer.ID, error)
RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error)
@ -143,7 +143,7 @@ func (ms *MessageSigner) NextNonce(ctx context.Context, addr address.Address) (u
}
// Get the next nonce for this address from the datastore
addrNonceKey := ms.DstoreKey(addr)
addrNonceKey := ms.dstoreKey(addr)
dsNonceBytes, err := ms.ds.Get(ctx, addrNonceKey)
switch {
@ -183,7 +183,7 @@ func (ms *MessageSigner) SaveNonce(ctx context.Context, addr address.Address, no
nonce++
// Write the nonce to the datastore
addrNonceKey := ms.DstoreKey(addr)
addrNonceKey := ms.dstoreKey(addr)
buf := bytes.Buffer{}
_, err := buf.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, nonce))
if err != nil {
@ -196,7 +196,7 @@ func (ms *MessageSigner) SaveNonce(ctx context.Context, addr address.Address, no
return nil
}
func (ms *MessageSigner) DstoreKey(addr address.Address) datastore.Key {
func (ms *MessageSigner) dstoreKey(addr address.Address) datastore.Key {
return datastore.KeyWithNamespaces([]string{dsKeyActorNonce, addr.String()})
}

View File

@ -6,6 +6,7 @@ import (
"context"
"errors"
"fmt"
"golang.org/x/exp/slices"
"sort"
"time"
@ -72,6 +73,8 @@ type Consensus struct {
rpcReady chan struct{}
readyCh chan struct{}
peerSet []peer.ID
//shutdownLock sync.RWMutex
//shutdown bool
}
@ -115,6 +118,7 @@ func NewConsensus(host host.Host, cfg *config.ClusterRaftConfig, staging bool) (
actor: actor,
state: state,
raft: raft,
peerSet: cfg.InitPeerset,
rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
}
@ -146,7 +150,7 @@ func (cc *Consensus) WaitForSync(ctx context.Context) error {
leaderCtx, cancel := context.WithTimeout(
ctx,
cc.config.WaitForLeaderTimeout)
time.Duration(cc.config.WaitForLeaderTimeout))
defer cancel()
// 1 - wait for leader
@ -258,7 +262,7 @@ func (cc *Consensus) Ready(ctx context.Context) <-chan struct{} {
// IsTrustedPeer returns true. In Raft we trust all peers.
func (cc *Consensus) IsTrustedPeer(ctx context.Context, p peer.ID) bool {
return true
return slices.Contains(cc.peerSet, p)
}
// Trust is a no-Op.
@ -287,7 +291,7 @@ func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interf
logger.Warn("there seems to be no leader. Waiting for one")
rctx, cancel := context.WithTimeout(
ctx,
cc.config.WaitForLeaderTimeout,
time.Duration(cc.config.WaitForLeaderTimeout),
)
defer cancel()
pidstr, err := cc.raft.WaitForLeader(rctx)
@ -303,7 +307,8 @@ func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interf
}
}
logger.Infof("leader: %s, curr host: &s", leader, cc.host.ID())
logger.Infof("leader: %s, curr host: %s, peerSet: %s", leader, cc.host.ID(), cc.peerSet)
// We are the leader. Do not redirect
if leader == cc.host.ID() {
return false, nil
@ -374,7 +379,7 @@ func (cc *Consensus) Commit(ctx context.Context, op *ConsensusOp) error {
}
RETRY:
time.Sleep(cc.config.CommitRetryDelay)
time.Sleep(time.Duration(cc.config.CommitRetryDelay))
}
return finalErr
}
@ -402,7 +407,7 @@ func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error {
//cc.shutdownLock.RUnlock()
if finalErr != nil {
time.Sleep(cc.config.CommitRetryDelay)
time.Sleep(time.Duration(cc.config.CommitRetryDelay))
continue
}
logger.Infof("peer added to Raft: %s", pid.Pretty())
@ -432,7 +437,7 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error {
finalErr = cc.raft.RemovePeer(ctx, peer.Encode(pid))
//cc.shutdownLock.RUnlock()
if finalErr != nil {
time.Sleep(cc.config.CommitRetryDelay)
time.Sleep(time.Duration(cc.config.CommitRetryDelay))
continue
}
logger.Infof("peer removed from Raft: %s", pid.Pretty())

View File

@ -120,7 +120,7 @@ func (rw *raftWrapper) makeTransport() (err error) {
logger.Debug("creating libp2p Raft transport")
rw.transport, err = p2praft.NewLibp2pTransport(
rw.host,
rw.config.NetworkTimeout,
time.Duration(rw.config.NetworkTimeout),
)
return err
}

View File

@ -293,10 +293,10 @@ func DefaultClusterRaftConfig() *ClusterRaftConfig {
var cfg ClusterRaftConfig
cfg.DataFolder = "" // empty so it gets omitted
cfg.InitPeerset = []peer.ID{}
cfg.WaitForLeaderTimeout = DefaultWaitForLeaderTimeout
cfg.NetworkTimeout = DefaultNetworkTimeout
cfg.WaitForLeaderTimeout = Duration(DefaultWaitForLeaderTimeout)
cfg.NetworkTimeout = Duration(DefaultNetworkTimeout)
cfg.CommitRetries = DefaultCommitRetries
cfg.CommitRetryDelay = DefaultCommitRetryDelay
cfg.CommitRetryDelay = Duration(DefaultCommitRetryDelay)
cfg.BackupsRotate = DefaultBackupsRotate
cfg.DatastoreNamespace = DefaultDatastoreNamespace
cfg.RaftConfig = hraft.DefaultConfig()

View File

@ -1,8 +1,6 @@
package config
import (
"time"
hraft "github.com/hashicorp/raft"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
@ -610,15 +608,15 @@ type ClusterRaftConfig struct {
InitPeerset []peer.ID
// LeaderTimeout specifies how long to wait for a leader before
// failing an operation.
WaitForLeaderTimeout time.Duration
WaitForLeaderTimeout Duration
// NetworkTimeout specifies how long before a Raft network
// operation is timed out
NetworkTimeout time.Duration
NetworkTimeout Duration
// CommitRetries specifies how many times we retry a failed commit until
// we give up.
CommitRetries int
// How long to wait between retries
CommitRetryDelay time.Duration
CommitRetryDelay Duration
// BackupsRotate specifies the maximum number of Raft's DataFolder
// copies that we keep as backups (renaming) after cleanup.
BackupsRotate int

View File

@ -44,23 +44,17 @@ func (h *RPCHandler) AddPeer(ctx context.Context, pid peer.ID, ret *struct{}) er
// Add other consensus RPC calls here
func NewRPCClient(host host.Host) *rpc.Client {
protocolID := protocol.ID("/p2p/rpc/ping")
protocolID := protocol.ID("/rpc/lotus-chain/v0")
return rpc.NewClient(host, protocolID)
}
func NewRPCServer(host host.Host, rpcHandler *RPCHandler) error {
protocolID := protocol.ID("/p2p/rpc/ping")
rpcServer := rpc.NewServer(host, protocolID)
func NewRPCServer(ctx context.Context, host host.Host, rpcHandler *RPCHandler) error {
authF := func(pid peer.ID, svc, method string) bool {
return rpcHandler.cons.IsTrustedPeer(ctx, pid)
}
protocolID := protocol.ID("/rpc/lotus-chain/v0")
rpcServer := rpc.NewServer(host, protocolID, rpc.WithAuthorizeFunc(authF))
return rpcServer.RegisterName("Consensus", rpcHandler)
//return err
}
// contructorsfor rpc client and rpc server
// rpc handler
// rpcClient
// Consensus
// MessageSigner
// MpoolAPI
// RPC handler
// RPC server