From 986c5e3c68c57bd02905414e636d75cfb5868fbb Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Fri, 30 Sep 2022 16:45:04 +0000 Subject: [PATCH] Use multiaddrs in config for raft peerset --- chain/messagesigner/messagesigner_test.go | 3 ++- documentation/en/default-lotus-config.toml | 8 +++--- itests/raft_messagesigner_test.go | 9 +++++-- lib/consensus/raft/config.go | 22 +++++++++++----- lib/consensus/raft/consensus.go | 30 +++++++++++++++++++--- lib/consensus/raft/raft.go | 26 ++++++++++++++----- node/builder_chain.go | 3 ++- node/config/def.go | 3 +-- node/config/doc_gen.go | 6 ++--- node/config/types.go | 5 ++-- 10 files changed, 83 insertions(+), 32 deletions(-) diff --git a/chain/messagesigner/messagesigner_test.go b/chain/messagesigner/messagesigner_test.go index 334fbfe42..637f17b46 100644 --- a/chain/messagesigner/messagesigner_test.go +++ b/chain/messagesigner/messagesigner_test.go @@ -13,6 +13,7 @@ import ( "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet" ) @@ -22,7 +23,7 @@ type mockMpool struct { nonces map[address.Address]uint64 } -var _ MpoolNonceAPI = (*mockMpool)(nil) +var _ messagepool.MpoolNonceAPI = (*mockMpool)(nil) func newMockMpool() *mockMpool { return &mockMpool{nonces: make(map[address.Address]uint64)} diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index a38b7c901..4d2e18ff4 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -245,13 +245,13 @@ # env var: LOTUS_RAFT_DATAFOLDER #DataFolder = "" - # InitPeerset provides the list of initial cluster peers for new Raft + # InitPeersetMultiAddr provides the list of initial cluster peers for new Raft # peers (with no prior state). It is ignored when Raft was already # initialized or when starting in staging mode. # - # type: []peer.ID - # env var: LOTUS_RAFT_INITPEERSET - #InitPeerset = [] + # type: []string + # env var: LOTUS_RAFT_INITPEERSETMULTIADDR + #InitPeersetMultiAddr = [] # LeaderTimeout specifies how long to wait for a leader before # failing an operation. diff --git a/itests/raft_messagesigner_test.go b/itests/raft_messagesigner_test.go index 887cf7d31..09f153f8a 100644 --- a/itests/raft_messagesigner_test.go +++ b/itests/raft_messagesigner_test.go @@ -45,7 +45,6 @@ func generatePrivKey() (*kit.Libp2p, error) { func getRaftState(ctx context.Context, t *testing.T, node *kit.TestFullNode) *api.RaftStateData { raftState, err := node.RaftState(ctx) require.NoError(t, err) - //rstate := raftState.(*consensus.RaftState) return raftState } @@ -57,7 +56,13 @@ func setup(ctx context.Context, t *testing.T, node0 *kit.TestFullNode, node1 *ki pkey1, _ := generatePrivKey() pkey2, _ := generatePrivKey() - initPeerSet := []peer.ID{pkey0.PeerID, pkey1.PeerID, pkey2.PeerID} + pkeys := []*kit.Libp2p{pkey0, pkey1, pkey2} + initPeerSet := []string{} + for _, pkey := range pkeys { + initPeerSet = append(initPeerSet, "/p2p/"+pkey.PeerID.String()) + } + + //initPeerSet := []peer.ID{pkey0.PeerID, pkey1.PeerID, pkey2.PeerID} raftOps := kit.ConstructorOpts( node.Override(new(*gorpc.Client), modules.NewRPCClient), diff --git a/lib/consensus/raft/config.go b/lib/consensus/raft/config.go index 541f43e16..bf1e74c50 100644 --- a/lib/consensus/raft/config.go +++ b/lib/consensus/raft/config.go @@ -1,16 +1,15 @@ package consensus import ( - "github.com/filecoin-project/lotus/node/repo" "io/ioutil" "path/filepath" "time" hraft "github.com/hashicorp/raft" - "github.com/libp2p/go-libp2p/core/peer" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/repo" ) // ConfigKey is the default configuration key for holding this component's @@ -40,7 +39,7 @@ type ClusterRaftConfig struct { // InitPeerset provides the list of initial cluster peers for new Raft // peers (with no prior state). It is ignored when Raft was already // initialized or when starting in staging mode. - InitPeerset []peer.ID + InitPeerset []string // LeaderTimeout specifies how long to wait for a leader before // failing an operation. WaitForLeaderTimeout time.Duration @@ -68,7 +67,7 @@ type ClusterRaftConfig struct { func DefaultClusterRaftConfig() *ClusterRaftConfig { var cfg ClusterRaftConfig cfg.DataFolder = "" // empty so it gets omitted - cfg.InitPeerset = []peer.ID{} + cfg.InitPeerset = []string{} cfg.WaitForLeaderTimeout = DefaultWaitForLeaderTimeout cfg.NetworkTimeout = DefaultNetworkTimeout cfg.CommitRetries = DefaultCommitRetries @@ -90,7 +89,7 @@ func DefaultClusterRaftConfig() *ClusterRaftConfig { func NewClusterRaftConfig(userRaftConfig *config.UserRaftConfig) *ClusterRaftConfig { var cfg ClusterRaftConfig cfg.DataFolder = userRaftConfig.DataFolder - cfg.InitPeerset = userRaftConfig.InitPeerset + cfg.InitPeerset = userRaftConfig.InitPeersetMultiAddr cfg.WaitForLeaderTimeout = time.Duration(userRaftConfig.WaitForLeaderTimeout) cfg.NetworkTimeout = time.Duration(userRaftConfig.NetworkTimeout) cfg.CommitRetries = userRaftConfig.CommitRetries @@ -108,6 +107,15 @@ func NewClusterRaftConfig(userRaftConfig *config.UserRaftConfig) *ClusterRaftCon // Set up logging cfg.RaftConfig.LogOutput = ioutil.Discard //cfg.RaftConfig.Logger = &hcLogToLogger{} + + //addrInfos, err := addrutil.ParseAddresses(context.Background(), userRaftConfig.InitPeersetMultiAddr) + //if err != nil { + // return nil + //} + //for _, addrInfo := range addrInfos { + // cfg.InitPeerset = append(cfg.InitPeerset, addrInfo.ID) + //} + return &cfg } @@ -355,9 +363,9 @@ func ValidateConfig(cfg *ClusterRaftConfig) error { // GetDataFolder returns the Raft data folder that we are using. func (cfg *ClusterRaftConfig) GetDataFolder(repo repo.LockedRepo) string { if cfg.DataFolder == "" { - return filepath.Join(repo.Path() + DefaultDataSubFolder) + return filepath.Join(repo.Path(), DefaultDataSubFolder) } - return filepath.Join(repo.Path() + cfg.DataFolder) + return filepath.Join(repo.Path(), cfg.DataFolder) } // diff --git a/lib/consensus/raft/consensus.go b/lib/consensus/raft/consensus.go index 07e9136c0..05d527bb8 100644 --- a/lib/consensus/raft/consensus.go +++ b/lib/consensus/raft/consensus.go @@ -6,7 +6,7 @@ import ( "context" "errors" "fmt" - "github.com/filecoin-project/lotus/node/repo" + "github.com/filecoin-project/lotus/lib/addrutil" "sort" "time" @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/node/repo" //ds "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" @@ -36,7 +37,21 @@ var logger = logging.Logger("raft") type RaftState struct { NonceMap api.NonceMapType MsgUuids api.MsgUuidMapType - Mpool *messagepool.MessagePool + + // TODO: add comment explaining why this is needed + // We need a reference to the messagepool in the raft state in order to + // sync messages that have been sent by the leader node + // Miner calls StateWaitMsg after MpoolPushMessage to check if the message has + // landed on chain. This check requires the message be stored in the local chainstore + // If a leadernode goes down after sending a message to the chain and is replaced by + // another node, the other node needs to have this message in its chainstore for the + // above check to succeed. + + // This is because the miner only stores signed CIDs but the message received from in a + // block will be unsigned (for BLS). Hence, the process relies on the node to store the + // signed message which holds a copy of the unsigned message to properly perform all the + // needed checks + Mpool *messagepool.MessagePool } func newRaftState(mpool *messagepool.MessagePool) *RaftState { @@ -120,6 +135,15 @@ func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.Mes actor := libp2praft.NewActor(raft.raft) consensus.SetActor(actor) + peers := []peer.ID{} + addrInfos, err := addrutil.ParseAddresses(ctx, cfg.InitPeerset) + for _, addrInfo := range addrInfos { + peers = append(peers, addrInfo.ID) + + // Add peer to address book + host.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, time.Duration(time.Hour*100)) + } + cc := &Consensus{ ctx: ctx, cancel: cancel, @@ -129,7 +153,7 @@ func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.Mes actor: actor, state: state, raft: raft, - peerSet: cfg.InitPeerset, + peerSet: peers, rpcReady: make(chan struct{}, 1), readyCh: make(chan struct{}, 1), repo: repo, diff --git a/lib/consensus/raft/raft.go b/lib/consensus/raft/raft.go index 1fb6195de..0f62368a0 100644 --- a/lib/consensus/raft/raft.go +++ b/lib/consensus/raft/raft.go @@ -4,9 +4,7 @@ import ( "context" "errors" "fmt" - "github.com/filecoin-project/lotus/node/repo" - "github.com/ipfs/go-log/v2" - "go.uber.org/zap" + "github.com/filecoin-project/lotus/lib/addrutil" "io" "os" "path/filepath" @@ -14,9 +12,13 @@ import ( hraft "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" + "github.com/ipfs/go-log/v2" p2praft "github.com/libp2p/go-libp2p-raft" host "github.com/libp2p/go-libp2p/core/host" peer "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/zap" + + "github.com/filecoin-project/lotus/node/repo" ) var raftLogger = log.Logger("raft-cluster") @@ -86,7 +88,10 @@ func newRaftWrapper( return nil, err } - raftW.makeServerConfig() + err = raftW.makeServerConfig() + if err != nil { + return nil, err + } err = raftW.makeTransport() if err != nil { @@ -225,8 +230,17 @@ func (rw *raftWrapper) Bootstrap() (bool, error) { // create Raft servers configuration. The result is used // by Bootstrap() when it proceeds to Bootstrap. -func (rw *raftWrapper) makeServerConfig() { - rw.serverConfig = makeServerConf(append(rw.config.InitPeerset, rw.host.ID())) +func (rw *raftWrapper) makeServerConfig() error { + peers := []peer.ID{} + addrInfos, err := addrutil.ParseAddresses(context.Background(), rw.config.InitPeerset) + if err != nil { + return err + } + for _, addrInfo := range addrInfos { + peers = append(peers, addrInfo.ID) + } + rw.serverConfig = makeServerConf(append(peers, rw.host.ID())) + return nil } // creates a server configuration with all peers as Voters. diff --git a/node/builder_chain.go b/node/builder_chain.go index 3cb8af830..ef70df556 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -242,7 +242,8 @@ func ConfigFullNode(c interface{}) Option { // Chain node cluster enabled If(cfg.Raft.ClusterModeEnabled, Override(new(*gorpc.Client), modules.NewRPCClient), - Override(new(*consensus.Consensus), consensus2.NewConsensusWithRPCClient(false)), + Override(new(*consensus2.ClusterRaftConfig), consensus2.NewClusterRaftConfig(&cfg.Raft)), + Override(new(*consensus2.Consensus), consensus2.NewConsensusWithRPCClient(false)), Override(new(*messagesigner.MessageSignerConsensus), messagesigner.NewMessageSignerConsensus), Override(new(messagesigner.MsgSigner), From(new(*messagesigner.MessageSignerConsensus))), Override(new(*modules.RPCHandler), modules.NewRPCHandler), diff --git a/node/config/def.go b/node/config/def.go index 36fb05c95..90a65778d 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -7,7 +7,6 @@ import ( "time" "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/peer" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" @@ -313,7 +312,7 @@ var ( func DefaultUserRaftConfig() *UserRaftConfig { var cfg UserRaftConfig cfg.DataFolder = "" // empty so it gets omitted - cfg.InitPeerset = []peer.ID{} + cfg.InitPeersetMultiAddr = []string{} cfg.WaitForLeaderTimeout = Duration(DefaultWaitForLeaderTimeout) cfg.NetworkTimeout = Duration(DefaultNetworkTimeout) cfg.CommitRetries = DefaultCommitRetries diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index ca62cd4d5..8d312b9b6 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -1243,10 +1243,10 @@ finalities beyond the compaction boundary, default is 0, -1 retains everything`, Comment: `A folder to store Raft's data.`, }, { - Name: "InitPeerset", - Type: "[]peer.ID", + Name: "InitPeersetMultiAddr", + Type: "[]string", - Comment: `InitPeerset provides the list of initial cluster peers for new Raft + Comment: `InitPeersetMultiAddr provides the list of initial cluster peers for new Raft peers (with no prior state). It is ignored when Raft was already initialized or when starting in staging mode.`, }, diff --git a/node/config/types.go b/node/config/types.go index 1b0963c55..106c594e7 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -2,7 +2,6 @@ package config import ( "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/peer" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/storage/sealer" @@ -654,10 +653,10 @@ type UserRaftConfig struct { HostShutdown bool // A folder to store Raft's data. DataFolder string - // InitPeerset provides the list of initial cluster peers for new Raft + // InitPeersetMultiAddr provides the list of initial cluster peers for new Raft // peers (with no prior state). It is ignored when Raft was already // initialized or when starting in staging mode. - InitPeerset []peer.ID + InitPeersetMultiAddr []string // LeaderTimeout specifies how long to wait for a leader before // failing an operation. WaitForLeaderTimeout Duration