Use multiaddrs in config for raft peerset

This commit is contained in:
Shrenuj Bansal 2022-09-30 16:45:04 +00:00
parent b8060cd8f7
commit 986c5e3c68
10 changed files with 83 additions and 32 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet" "github.com/filecoin-project/lotus/chain/wallet"
) )
@ -22,7 +23,7 @@ type mockMpool struct {
nonces map[address.Address]uint64 nonces map[address.Address]uint64
} }
var _ MpoolNonceAPI = (*mockMpool)(nil) var _ messagepool.MpoolNonceAPI = (*mockMpool)(nil)
func newMockMpool() *mockMpool { func newMockMpool() *mockMpool {
return &mockMpool{nonces: make(map[address.Address]uint64)} return &mockMpool{nonces: make(map[address.Address]uint64)}

View File

@ -245,13 +245,13 @@
# env var: LOTUS_RAFT_DATAFOLDER # env var: LOTUS_RAFT_DATAFOLDER
#DataFolder = "" #DataFolder = ""
# InitPeerset provides the list of initial cluster peers for new Raft # InitPeersetMultiAddr provides the list of initial cluster peers for new Raft
# peers (with no prior state). It is ignored when Raft was already # peers (with no prior state). It is ignored when Raft was already
# initialized or when starting in staging mode. # initialized or when starting in staging mode.
# #
# type: []peer.ID # type: []string
# env var: LOTUS_RAFT_INITPEERSET # env var: LOTUS_RAFT_INITPEERSETMULTIADDR
#InitPeerset = [] #InitPeersetMultiAddr = []
# LeaderTimeout specifies how long to wait for a leader before # LeaderTimeout specifies how long to wait for a leader before
# failing an operation. # failing an operation.

View File

@ -45,7 +45,6 @@ func generatePrivKey() (*kit.Libp2p, error) {
func getRaftState(ctx context.Context, t *testing.T, node *kit.TestFullNode) *api.RaftStateData { func getRaftState(ctx context.Context, t *testing.T, node *kit.TestFullNode) *api.RaftStateData {
raftState, err := node.RaftState(ctx) raftState, err := node.RaftState(ctx)
require.NoError(t, err) require.NoError(t, err)
//rstate := raftState.(*consensus.RaftState)
return raftState return raftState
} }
@ -57,7 +56,13 @@ func setup(ctx context.Context, t *testing.T, node0 *kit.TestFullNode, node1 *ki
pkey1, _ := generatePrivKey() pkey1, _ := generatePrivKey()
pkey2, _ := generatePrivKey() pkey2, _ := generatePrivKey()
initPeerSet := []peer.ID{pkey0.PeerID, pkey1.PeerID, pkey2.PeerID} pkeys := []*kit.Libp2p{pkey0, pkey1, pkey2}
initPeerSet := []string{}
for _, pkey := range pkeys {
initPeerSet = append(initPeerSet, "/p2p/"+pkey.PeerID.String())
}
//initPeerSet := []peer.ID{pkey0.PeerID, pkey1.PeerID, pkey2.PeerID}
raftOps := kit.ConstructorOpts( raftOps := kit.ConstructorOpts(
node.Override(new(*gorpc.Client), modules.NewRPCClient), node.Override(new(*gorpc.Client), modules.NewRPCClient),

View File

@ -1,16 +1,15 @@
package consensus package consensus
import ( import (
"github.com/filecoin-project/lotus/node/repo"
"io/ioutil" "io/ioutil"
"path/filepath" "path/filepath"
"time" "time"
hraft "github.com/hashicorp/raft" hraft "github.com/hashicorp/raft"
"github.com/libp2p/go-libp2p/core/peer"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/repo"
) )
// ConfigKey is the default configuration key for holding this component's // ConfigKey is the default configuration key for holding this component's
@ -40,7 +39,7 @@ type ClusterRaftConfig struct {
// InitPeerset provides the list of initial cluster peers for new Raft // InitPeerset provides the list of initial cluster peers for new Raft
// peers (with no prior state). It is ignored when Raft was already // peers (with no prior state). It is ignored when Raft was already
// initialized or when starting in staging mode. // initialized or when starting in staging mode.
InitPeerset []peer.ID InitPeerset []string
// LeaderTimeout specifies how long to wait for a leader before // LeaderTimeout specifies how long to wait for a leader before
// failing an operation. // failing an operation.
WaitForLeaderTimeout time.Duration WaitForLeaderTimeout time.Duration
@ -68,7 +67,7 @@ type ClusterRaftConfig struct {
func DefaultClusterRaftConfig() *ClusterRaftConfig { func DefaultClusterRaftConfig() *ClusterRaftConfig {
var cfg ClusterRaftConfig var cfg ClusterRaftConfig
cfg.DataFolder = "" // empty so it gets omitted cfg.DataFolder = "" // empty so it gets omitted
cfg.InitPeerset = []peer.ID{} cfg.InitPeerset = []string{}
cfg.WaitForLeaderTimeout = DefaultWaitForLeaderTimeout cfg.WaitForLeaderTimeout = DefaultWaitForLeaderTimeout
cfg.NetworkTimeout = DefaultNetworkTimeout cfg.NetworkTimeout = DefaultNetworkTimeout
cfg.CommitRetries = DefaultCommitRetries cfg.CommitRetries = DefaultCommitRetries
@ -90,7 +89,7 @@ func DefaultClusterRaftConfig() *ClusterRaftConfig {
func NewClusterRaftConfig(userRaftConfig *config.UserRaftConfig) *ClusterRaftConfig { func NewClusterRaftConfig(userRaftConfig *config.UserRaftConfig) *ClusterRaftConfig {
var cfg ClusterRaftConfig var cfg ClusterRaftConfig
cfg.DataFolder = userRaftConfig.DataFolder cfg.DataFolder = userRaftConfig.DataFolder
cfg.InitPeerset = userRaftConfig.InitPeerset cfg.InitPeerset = userRaftConfig.InitPeersetMultiAddr
cfg.WaitForLeaderTimeout = time.Duration(userRaftConfig.WaitForLeaderTimeout) cfg.WaitForLeaderTimeout = time.Duration(userRaftConfig.WaitForLeaderTimeout)
cfg.NetworkTimeout = time.Duration(userRaftConfig.NetworkTimeout) cfg.NetworkTimeout = time.Duration(userRaftConfig.NetworkTimeout)
cfg.CommitRetries = userRaftConfig.CommitRetries cfg.CommitRetries = userRaftConfig.CommitRetries
@ -108,6 +107,15 @@ func NewClusterRaftConfig(userRaftConfig *config.UserRaftConfig) *ClusterRaftCon
// Set up logging // Set up logging
cfg.RaftConfig.LogOutput = ioutil.Discard cfg.RaftConfig.LogOutput = ioutil.Discard
//cfg.RaftConfig.Logger = &hcLogToLogger{} //cfg.RaftConfig.Logger = &hcLogToLogger{}
//addrInfos, err := addrutil.ParseAddresses(context.Background(), userRaftConfig.InitPeersetMultiAddr)
//if err != nil {
// return nil
//}
//for _, addrInfo := range addrInfos {
// cfg.InitPeerset = append(cfg.InitPeerset, addrInfo.ID)
//}
return &cfg return &cfg
} }
@ -355,9 +363,9 @@ func ValidateConfig(cfg *ClusterRaftConfig) error {
// GetDataFolder returns the Raft data folder that we are using. // GetDataFolder returns the Raft data folder that we are using.
func (cfg *ClusterRaftConfig) GetDataFolder(repo repo.LockedRepo) string { func (cfg *ClusterRaftConfig) GetDataFolder(repo repo.LockedRepo) string {
if cfg.DataFolder == "" { if cfg.DataFolder == "" {
return filepath.Join(repo.Path() + DefaultDataSubFolder) return filepath.Join(repo.Path(), DefaultDataSubFolder)
} }
return filepath.Join(repo.Path() + cfg.DataFolder) return filepath.Join(repo.Path(), cfg.DataFolder)
} }
// //

View File

@ -6,7 +6,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/lib/addrutil"
"sort" "sort"
"time" "time"
@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/repo"
//ds "github.com/ipfs/go-datastore" //ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
@ -36,6 +37,20 @@ var logger = logging.Logger("raft")
type RaftState struct { type RaftState struct {
NonceMap api.NonceMapType NonceMap api.NonceMapType
MsgUuids api.MsgUuidMapType MsgUuids api.MsgUuidMapType
// TODO: add comment explaining why this is needed
// We need a reference to the messagepool in the raft state in order to
// sync messages that have been sent by the leader node
// Miner calls StateWaitMsg after MpoolPushMessage to check if the message has
// landed on chain. This check requires the message be stored in the local chainstore
// If a leadernode goes down after sending a message to the chain and is replaced by
// another node, the other node needs to have this message in its chainstore for the
// above check to succeed.
// This is because the miner only stores signed CIDs but the message received from in a
// block will be unsigned (for BLS). Hence, the process relies on the node to store the
// signed message which holds a copy of the unsigned message to properly perform all the
// needed checks
Mpool *messagepool.MessagePool Mpool *messagepool.MessagePool
} }
@ -120,6 +135,15 @@ func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.Mes
actor := libp2praft.NewActor(raft.raft) actor := libp2praft.NewActor(raft.raft)
consensus.SetActor(actor) consensus.SetActor(actor)
peers := []peer.ID{}
addrInfos, err := addrutil.ParseAddresses(ctx, cfg.InitPeerset)
for _, addrInfo := range addrInfos {
peers = append(peers, addrInfo.ID)
// Add peer to address book
host.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, time.Duration(time.Hour*100))
}
cc := &Consensus{ cc := &Consensus{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -129,7 +153,7 @@ func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.Mes
actor: actor, actor: actor,
state: state, state: state,
raft: raft, raft: raft,
peerSet: cfg.InitPeerset, peerSet: peers,
rpcReady: make(chan struct{}, 1), rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1), readyCh: make(chan struct{}, 1),
repo: repo, repo: repo,

View File

@ -4,9 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/lib/addrutil"
"github.com/ipfs/go-log/v2"
"go.uber.org/zap"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
@ -14,9 +12,13 @@ import (
hraft "github.com/hashicorp/raft" hraft "github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb" raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/ipfs/go-log/v2"
p2praft "github.com/libp2p/go-libp2p-raft" p2praft "github.com/libp2p/go-libp2p-raft"
host "github.com/libp2p/go-libp2p/core/host" host "github.com/libp2p/go-libp2p/core/host"
peer "github.com/libp2p/go-libp2p/core/peer" peer "github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/zap"
"github.com/filecoin-project/lotus/node/repo"
) )
var raftLogger = log.Logger("raft-cluster") var raftLogger = log.Logger("raft-cluster")
@ -86,7 +88,10 @@ func newRaftWrapper(
return nil, err return nil, err
} }
raftW.makeServerConfig() err = raftW.makeServerConfig()
if err != nil {
return nil, err
}
err = raftW.makeTransport() err = raftW.makeTransport()
if err != nil { if err != nil {
@ -225,8 +230,17 @@ func (rw *raftWrapper) Bootstrap() (bool, error) {
// create Raft servers configuration. The result is used // create Raft servers configuration. The result is used
// by Bootstrap() when it proceeds to Bootstrap. // by Bootstrap() when it proceeds to Bootstrap.
func (rw *raftWrapper) makeServerConfig() { func (rw *raftWrapper) makeServerConfig() error {
rw.serverConfig = makeServerConf(append(rw.config.InitPeerset, rw.host.ID())) peers := []peer.ID{}
addrInfos, err := addrutil.ParseAddresses(context.Background(), rw.config.InitPeerset)
if err != nil {
return err
}
for _, addrInfo := range addrInfos {
peers = append(peers, addrInfo.ID)
}
rw.serverConfig = makeServerConf(append(peers, rw.host.ID()))
return nil
} }
// creates a server configuration with all peers as Voters. // creates a server configuration with all peers as Voters.

View File

@ -242,7 +242,8 @@ func ConfigFullNode(c interface{}) Option {
// Chain node cluster enabled // Chain node cluster enabled
If(cfg.Raft.ClusterModeEnabled, If(cfg.Raft.ClusterModeEnabled,
Override(new(*gorpc.Client), modules.NewRPCClient), Override(new(*gorpc.Client), modules.NewRPCClient),
Override(new(*consensus.Consensus), consensus2.NewConsensusWithRPCClient(false)), Override(new(*consensus2.ClusterRaftConfig), consensus2.NewClusterRaftConfig(&cfg.Raft)),
Override(new(*consensus2.Consensus), consensus2.NewConsensusWithRPCClient(false)),
Override(new(*messagesigner.MessageSignerConsensus), messagesigner.NewMessageSignerConsensus), Override(new(*messagesigner.MessageSignerConsensus), messagesigner.NewMessageSignerConsensus),
Override(new(messagesigner.MsgSigner), From(new(*messagesigner.MessageSignerConsensus))), Override(new(messagesigner.MsgSigner), From(new(*messagesigner.MessageSignerConsensus))),
Override(new(*modules.RPCHandler), modules.NewRPCHandler), Override(new(*modules.RPCHandler), modules.NewRPCHandler),

View File

@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
@ -313,7 +312,7 @@ var (
func DefaultUserRaftConfig() *UserRaftConfig { func DefaultUserRaftConfig() *UserRaftConfig {
var cfg UserRaftConfig var cfg UserRaftConfig
cfg.DataFolder = "" // empty so it gets omitted cfg.DataFolder = "" // empty so it gets omitted
cfg.InitPeerset = []peer.ID{} cfg.InitPeersetMultiAddr = []string{}
cfg.WaitForLeaderTimeout = Duration(DefaultWaitForLeaderTimeout) cfg.WaitForLeaderTimeout = Duration(DefaultWaitForLeaderTimeout)
cfg.NetworkTimeout = Duration(DefaultNetworkTimeout) cfg.NetworkTimeout = Duration(DefaultNetworkTimeout)
cfg.CommitRetries = DefaultCommitRetries cfg.CommitRetries = DefaultCommitRetries

View File

@ -1243,10 +1243,10 @@ finalities beyond the compaction boundary, default is 0, -1 retains everything`,
Comment: `A folder to store Raft's data.`, Comment: `A folder to store Raft's data.`,
}, },
{ {
Name: "InitPeerset", Name: "InitPeersetMultiAddr",
Type: "[]peer.ID", Type: "[]string",
Comment: `InitPeerset provides the list of initial cluster peers for new Raft Comment: `InitPeersetMultiAddr provides the list of initial cluster peers for new Raft
peers (with no prior state). It is ignored when Raft was already peers (with no prior state). It is ignored when Raft was already
initialized or when starting in staging mode.`, initialized or when starting in staging mode.`,
}, },

View File

@ -2,7 +2,6 @@ package config
import ( import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer"
@ -654,10 +653,10 @@ type UserRaftConfig struct {
HostShutdown bool HostShutdown bool
// A folder to store Raft's data. // A folder to store Raft's data.
DataFolder string DataFolder string
// InitPeerset provides the list of initial cluster peers for new Raft // InitPeersetMultiAddr provides the list of initial cluster peers for new Raft
// peers (with no prior state). It is ignored when Raft was already // peers (with no prior state). It is ignored when Raft was already
// initialized or when starting in staging mode. // initialized or when starting in staging mode.
InitPeerset []peer.ID InitPeersetMultiAddr []string
// LeaderTimeout specifies how long to wait for a leader before // LeaderTimeout specifies how long to wait for a leader before
// failing an operation. // failing an operation.
WaitForLeaderTimeout Duration WaitForLeaderTimeout Duration