diff --git a/.circleci/config.yml b/.circleci/config.yml index 20701f7d5..7987060d4 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -858,12 +858,6 @@ workflows: - build suite: itest-pending_deal_allocation target: "./itests/pending_deal_allocation_test.go" - - test: - name: test-itest-raft_messagesigner - requires: - - build - suite: itest-raft_messagesigner - target: "./itests/raft_messagesigner_test.go" - test: name: test-itest-remove_verifreg_datacap requires: diff --git a/api/api_full.go b/api/api_full.go index 4ae2ea531..3dc7f8bb2 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -879,9 +879,6 @@ type FullNode interface { // LOTUS_BACKUP_BASE_PATH environment variable set to some path, and that // the path specified when calling CreateBackup is within the base path CreateBackup(ctx context.Context, fpath string) error //perm:admin - - RaftState(ctx context.Context) (*RaftStateData, error) //perm:read - RaftLeader(ctx context.Context) (peer.ID, error) //perm:read } // reverse interface to the client, called after EthSubscribe diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 018629600..70024d3db 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -355,10 +355,6 @@ func init() { addExample(map[string]bitfield.BitField{ "": bitfield.NewFromSet([]uint64{5, 6, 7, 10}), }) - addExample(&api.RaftStateData{ - NonceMap: make(map[address.Address]uint64), - MsgUuids: make(map[uuid.UUID]*types.SignedMessage), - }) addExample(http.Header{ "Authorization": []string{"Bearer ey.."}, diff --git a/api/mocks/mock_full.go b/api/mocks/mock_full.go index 92b719550..ed9fe740e 100644 --- a/api/mocks/mock_full.go +++ b/api/mocks/mock_full.go @@ -2919,36 +2919,6 @@ func (mr *MockFullNodeMockRecorder) PaychVoucherSubmit(arg0, arg1, arg2, arg3, a return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PaychVoucherSubmit", reflect.TypeOf((*MockFullNode)(nil).PaychVoucherSubmit), arg0, arg1, arg2, arg3, arg4) } -// RaftLeader mocks base method. -func (m *MockFullNode) RaftLeader(arg0 context.Context) (peer.ID, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RaftLeader", arg0) - ret0, _ := ret[0].(peer.ID) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// RaftLeader indicates an expected call of RaftLeader. -func (mr *MockFullNodeMockRecorder) RaftLeader(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RaftLeader", reflect.TypeOf((*MockFullNode)(nil).RaftLeader), arg0) -} - -// RaftState mocks base method. -func (m *MockFullNode) RaftState(arg0 context.Context) (*api.RaftStateData, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RaftState", arg0) - ret0, _ := ret[0].(*api.RaftStateData) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// RaftState indicates an expected call of RaftState. -func (mr *MockFullNodeMockRecorder) RaftState(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RaftState", reflect.TypeOf((*MockFullNode)(nil).RaftState), arg0) -} - // Session mocks base method. func (m *MockFullNode) Session(arg0 context.Context) (uuid.UUID, error) { m.ctrl.T.Helper() diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 8adcbc189..589ae8f56 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -453,10 +453,6 @@ type FullNodeMethods struct { PaychVoucherSubmit func(p0 context.Context, p1 address.Address, p2 *paych.SignedVoucher, p3 []byte, p4 []byte) (cid.Cid, error) `perm:"sign"` - RaftLeader func(p0 context.Context) (peer.ID, error) `perm:"read"` - - RaftState func(p0 context.Context) (*RaftStateData, error) `perm:"read"` - StateAccountKey func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) `perm:"read"` StateActorCodeCIDs func(p0 context.Context, p1 abinetwork.Version) (map[string]cid.Cid, error) `perm:"read"` @@ -3223,28 +3219,6 @@ func (s *FullNodeStub) PaychVoucherSubmit(p0 context.Context, p1 address.Address return *new(cid.Cid), ErrNotSupported } -func (s *FullNodeStruct) RaftLeader(p0 context.Context) (peer.ID, error) { - if s.Internal.RaftLeader == nil { - return *new(peer.ID), ErrNotSupported - } - return s.Internal.RaftLeader(p0) -} - -func (s *FullNodeStub) RaftLeader(p0 context.Context) (peer.ID, error) { - return *new(peer.ID), ErrNotSupported -} - -func (s *FullNodeStruct) RaftState(p0 context.Context) (*RaftStateData, error) { - if s.Internal.RaftState == nil { - return nil, ErrNotSupported - } - return s.Internal.RaftState(p0) -} - -func (s *FullNodeStub) RaftState(p0 context.Context) (*RaftStateData, error) { - return nil, ErrNotSupported -} - func (s *FullNodeStruct) StateAccountKey(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) { if s.Internal.StateAccountKey == nil { return *new(address.Address), ErrNotSupported diff --git a/api/types.go b/api/types.go index 93ed4083f..7fd607750 100644 --- a/api/types.go +++ b/api/types.go @@ -69,11 +69,6 @@ type MessageSendSpec struct { MaximizeFeeCap bool } -type MpoolMessageWhole struct { - Msg *types.Message - Spec *MessageSendSpec -} - // GraphSyncDataTransfer provides diagnostics on a data transfer happening over graphsync type GraphSyncDataTransfer struct { // GraphSync request id for this transfer @@ -351,64 +346,6 @@ type ForkUpgradeParams struct { UpgradeWatermelonHeight abi.ChainEpoch } -type NonceMapType map[address.Address]uint64 -type MsgUuidMapType map[uuid.UUID]*types.SignedMessage - -type RaftStateData struct { - NonceMap NonceMapType - MsgUuids MsgUuidMapType -} - -func (n *NonceMapType) MarshalJSON() ([]byte, error) { - marshalled := make(map[string]uint64) - for a, n := range *n { - marshalled[a.String()] = n - } - return json.Marshal(marshalled) -} - -func (n *NonceMapType) UnmarshalJSON(b []byte) error { - unmarshalled := make(map[string]uint64) - err := json.Unmarshal(b, &unmarshalled) - if err != nil { - return err - } - *n = make(map[address.Address]uint64) - for saddr, nonce := range unmarshalled { - a, err := address.NewFromString(saddr) - if err != nil { - return err - } - (*n)[a] = nonce - } - return nil -} - -func (m *MsgUuidMapType) MarshalJSON() ([]byte, error) { - marshalled := make(map[string]*types.SignedMessage) - for u, msg := range *m { - marshalled[u.String()] = msg - } - return json.Marshal(marshalled) -} - -func (m *MsgUuidMapType) UnmarshalJSON(b []byte) error { - unmarshalled := make(map[string]*types.SignedMessage) - err := json.Unmarshal(b, &unmarshalled) - if err != nil { - return err - } - *m = make(map[uuid.UUID]*types.SignedMessage) - for suid, msg := range unmarshalled { - u, err := uuid.Parse(suid) - if err != nil { - return err - } - (*m)[u] = msg - } - return nil -} - // ChainExportConfig holds configuration for chain ranged exports. type ChainExportConfig struct { WriteBufferSize int diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 994266492..3aa6bc16b 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/gateway.json.gz b/build/openrpc/gateway.json.gz index 2a3e5c939..64765a403 100644 Binary files a/build/openrpc/gateway.json.gz and b/build/openrpc/gateway.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 6a82cdd95..4702c5867 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 2e6e690e5..ad518528c 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/chain/messagesigner/messagesigner_consensus.go b/chain/messagesigner/messagesigner_consensus.go deleted file mode 100644 index 905bb7199..000000000 --- a/chain/messagesigner/messagesigner_consensus.go +++ /dev/null @@ -1,98 +0,0 @@ -package messagesigner - -import ( - "context" - - "github.com/google/uuid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - "github.com/libp2p/go-libp2p/core/peer" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/messagepool" - "github.com/filecoin-project/lotus/chain/types" - consensus "github.com/filecoin-project/lotus/lib/consensus/raft" - "github.com/filecoin-project/lotus/node/modules/dtypes" -) - -type MessageSignerConsensus struct { - MsgSigner - Consensus *consensus.Consensus -} - -func NewMessageSignerConsensus( - wallet api.Wallet, - mpool messagepool.MpoolNonceAPI, - ds dtypes.MetadataDS, - consensus *consensus.Consensus) *MessageSignerConsensus { - - ds = namespace.Wrap(ds, datastore.NewKey("/message-signer-consensus/")) - return &MessageSignerConsensus{ - MsgSigner: &MessageSigner{ - wallet: wallet, - mpool: mpool, - ds: ds, - }, - Consensus: consensus, - } -} - -func (ms *MessageSignerConsensus) IsLeader(ctx context.Context) bool { - return ms.Consensus.IsLeader(ctx) -} - -func (ms *MessageSignerConsensus) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) { - ok, err := ms.Consensus.RedirectToLeader(method, arg, ret.(*types.SignedMessage)) - if err != nil { - return ok, err - } - return ok, nil -} - -func (ms *MessageSignerConsensus) SignMessage( - ctx context.Context, - msg *types.Message, - spec *api.MessageSendSpec, - cb func(*types.SignedMessage) error) (*types.SignedMessage, error) { - - signedMsg, err := ms.MsgSigner.SignMessage(ctx, msg, spec, cb) - if err != nil { - return nil, err - } - - op := &consensus.ConsensusOp{ - Nonce: signedMsg.Message.Nonce, - Uuid: spec.MsgUuid, - Addr: signedMsg.Message.From, - SignedMsg: signedMsg, - } - err = ms.Consensus.Commit(ctx, op) - if err != nil { - return nil, err - } - - return signedMsg, nil -} - -func (ms *MessageSignerConsensus) GetSignedMessage(ctx context.Context, uuid uuid.UUID) (*types.SignedMessage, error) { - cstate, err := ms.Consensus.State(ctx) - if err != nil { - return nil, err - } - - //cstate := state.(Consensus.RaftState) - msg, ok := cstate.MsgUuids[uuid] - if !ok { - return nil, xerrors.Errorf("Msg with Uuid %s not available", uuid) - } - return msg, nil -} - -func (ms *MessageSignerConsensus) GetRaftState(ctx context.Context) (*consensus.RaftState, error) { - return ms.Consensus.State(ctx) -} - -func (ms *MessageSignerConsensus) Leader(ctx context.Context) (peer.ID, error) { - return ms.Consensus.Leader(ctx) -} diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index c2929f0f5..53b5ddbe6 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -211,9 +211,6 @@ * [PaychVoucherCreate](#PaychVoucherCreate) * [PaychVoucherList](#PaychVoucherList) * [PaychVoucherSubmit](#PaychVoucherSubmit) -* [Raft](#Raft) - * [RaftLeader](#RaftLeader) - * [RaftState](#RaftState) * [Start](#Start) * [StartTime](#StartTime) * [State](#State) @@ -6182,33 +6179,6 @@ Response: } ``` -## Raft - - -### RaftLeader - - -Perms: read - -Inputs: `null` - -Response: `"12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf"` - -### RaftState - - -Perms: read - -Inputs: `null` - -Response: -```json -{ - "NonceMap": {}, - "MsgUuids": {} -} -``` - ## Start diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index 1f143b896..9f9836bc0 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -260,68 +260,6 @@ #HotstoreMaxSpaceSafetyBuffer = 50000000000 -[Cluster] - # EXPERIMENTAL. config to enabled node cluster with raft consensus - # - # type: bool - # env var: LOTUS_CLUSTER_CLUSTERMODEENABLED - #ClusterModeEnabled = false - - # A folder to store Raft's data. - # - # type: string - # env var: LOTUS_CLUSTER_DATAFOLDER - #DataFolder = "" - - # InitPeersetMultiAddr provides the list of initial cluster peers for new Raft - # peers (with no prior state). It is ignored when Raft was already - # initialized or when starting in staging mode. - # - # type: []string - # env var: LOTUS_CLUSTER_INITPEERSETMULTIADDR - #InitPeersetMultiAddr = [] - - # LeaderTimeout specifies how long to wait for a leader before - # failing an operation. - # - # type: Duration - # env var: LOTUS_CLUSTER_WAITFORLEADERTIMEOUT - #WaitForLeaderTimeout = "15s" - - # NetworkTimeout specifies how long before a Raft network - # operation is timed out - # - # type: Duration - # env var: LOTUS_CLUSTER_NETWORKTIMEOUT - #NetworkTimeout = "1m40s" - - # CommitRetries specifies how many times we retry a failed commit until - # we give up. - # - # type: int - # env var: LOTUS_CLUSTER_COMMITRETRIES - #CommitRetries = 1 - - # How long to wait between retries - # - # type: Duration - # env var: LOTUS_CLUSTER_COMMITRETRYDELAY - #CommitRetryDelay = "200ms" - - # BackupsRotate specifies the maximum number of Raft's DataFolder - # copies that we keep as backups (renaming) after cleanup. - # - # type: int - # env var: LOTUS_CLUSTER_BACKUPSROTATE - #BackupsRotate = 6 - - # Tracing enables propagation of contexts across binary boundaries. - # - # type: bool - # env var: LOTUS_CLUSTER_TRACING - #Tracing = false - - [Fevm] # EnableEthRPC enables eth_ rpc, and enables storing a mapping of eth transaction hashes to filecoin message Cids. # This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above. diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 45f21786d..5a84e4143 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -688,8 +688,6 @@ func (n *Ensemble) Start() *Ensemble { copy.FullNode = modules.MakeUuidWrapper(copy.FullNode) m.FullNode = © - //m.FullNode.FullNode = modules.MakeUuidWrapper(fn.FullNode) - opts := []node.Option{ node.StorageMiner(&m.StorageMiner, cfg.Subsystems), node.Base(), @@ -697,8 +695,6 @@ func (n *Ensemble) Start() *Ensemble { node.Test(), node.If(m.options.disableLibp2p, node.MockHost(n.mn)), - //node.Override(new(v1api.RawFullNodeAPI), func() api.FullNode { return modules.MakeUuidWrapper(m.FullNode) }), - //node.Override(new(v1api.RawFullNodeAPI), modules.MakeUuidWrapper), node.Override(new(v1api.RawFullNodeAPI), m.FullNode), node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, m.ActorAddr)), diff --git a/itests/raft_messagesigner_test.go b/itests/raft_messagesigner_test.go deleted file mode 100644 index 220da9699..000000000 --- a/itests/raft_messagesigner_test.go +++ /dev/null @@ -1,577 +0,0 @@ -package itests - -import ( - "context" - "crypto/rand" - "fmt" - "reflect" - "testing" - "time" - - "github.com/google/uuid" - gorpc "github.com/libp2p/go-libp2p-gorpc" - libp2pcrypto "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/big" - "github.com/filecoin-project/go-state-types/exitcode" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/messagesigner" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/itests/kit" - consensus "github.com/filecoin-project/lotus/lib/consensus/raft" - "github.com/filecoin-project/lotus/node" - "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/node/impl" - "github.com/filecoin-project/lotus/node/modules" -) - -func generatePrivKey() (*kit.Libp2p, error) { - privkey, _, err := libp2pcrypto.GenerateEd25519Key(rand.Reader) - if err != nil { - return nil, err - } - - peerId, err := peer.IDFromPrivateKey(privkey) - if err != nil { - return nil, err - } - - return &kit.Libp2p{PeerID: peerId, PrivKey: privkey}, nil -} - -func getRaftState(ctx context.Context, t *testing.T, node *kit.TestFullNode) *api.RaftStateData { - raftState, err := node.RaftState(ctx) - require.NoError(t, err) - return raftState -} - -func setup(ctx context.Context, t *testing.T, node0 *kit.TestFullNode, node1 *kit.TestFullNode, node2 *kit.TestFullNode, miner *kit.TestMiner) *kit.Ensemble { - - blockTime := 1 * time.Second - - pkey0, _ := generatePrivKey() - pkey1, _ := generatePrivKey() - pkey2, _ := generatePrivKey() - - pkeys := []*kit.Libp2p{pkey0, pkey1, pkey2} - initPeerSet := []string{} - for _, pkey := range pkeys { - initPeerSet = append(initPeerSet, "/p2p/"+pkey.PeerID.String()) - } - - //initPeerSet := []peer.ID{pkey0.PeerID, pkey1.PeerID, pkey2.PeerID} - - raftOps := kit.ConstructorOpts( - node.Override(new(*gorpc.Client), modules.NewRPCClient), - node.Override(new(*consensus.ClusterRaftConfig), func() *consensus.ClusterRaftConfig { - cfg := consensus.DefaultClusterRaftConfig() - cfg.InitPeerset = initPeerSet - return cfg - }), - node.Override(new(*consensus.Consensus), consensus.NewConsensusWithRPCClient(false)), - node.Override(new(*messagesigner.MessageSignerConsensus), messagesigner.NewMessageSignerConsensus), - node.Override(new(messagesigner.MsgSigner), func(ms *messagesigner.MessageSignerConsensus) *messagesigner.MessageSignerConsensus { return ms }), - node.Override(new(*modules.RPCHandler), modules.NewRPCHandler), - node.Override(node.GoRPCServer, modules.NewRPCServer), - ) - //raftOps := kit.ConstructorOpts() - - ens := kit.NewEnsemble(t).FullNode(node0, raftOps, kit.ThroughRPC()).FullNode(node1, raftOps, kit.ThroughRPC()).FullNode(node2, raftOps, kit.ThroughRPC()) - node0.AssignPrivKey(pkey0) - node1.AssignPrivKey(pkey1) - node2.AssignPrivKey(pkey2) - - nodes := []*kit.TestFullNode{node0, node1, node2} - wrappedFullNode := kit.MergeFullNodes(nodes) - - ens.MinerEnroll(miner, wrappedFullNode, kit.WithAllSubsystems(), kit.ThroughRPC()) - ens.Start() - - // Import miner wallet to all nodes - addr0, err := node0.WalletImport(ctx, &miner.OwnerKey.KeyInfo) - require.NoError(t, err) - addr1, err := node1.WalletImport(ctx, &miner.OwnerKey.KeyInfo) - require.NoError(t, err) - addr2, err := node2.WalletImport(ctx, &miner.OwnerKey.KeyInfo) - require.NoError(t, err) - - fmt.Println(addr0, addr1, addr2) - - ens.InterconnectAll() - - ens.AddInactiveMiner(miner) - ens.Start() - - ens.InterconnectAll().BeginMining(blockTime) - - return ens -} - -func TestRaftState(t *testing.T) { - - kit.QuietMiningLogs() - ctx := context.Background() - - var ( - node0 kit.TestFullNode - node1 kit.TestFullNode - node2 kit.TestFullNode - miner kit.TestMiner - ) - - setup(ctx, t, &node0, &node1, &node2, &miner) - - fmt.Println(node0.WalletList(context.Background())) - fmt.Println(node1.WalletList(context.Background())) - fmt.Println(node2.WalletList(context.Background())) - - bal, err := node0.WalletBalance(ctx, node0.DefaultKey.Address) - require.NoError(t, err) - - msgHalfBal := &types.Message{ - From: miner.OwnerKey.Address, - To: node0.DefaultKey.Address, - Value: big.Div(bal, big.NewInt(2)), - } - - mu := uuid.New() - smHalfBal, err := node0.MpoolPushMessage(ctx, msgHalfBal, &api.MessageSendSpec{ - MsgUuid: mu, - }) - require.NoError(t, err) - mLookup, err := node0.StateWaitMsg(ctx, smHalfBal.Cid(), 3, api.LookbackNoLimit, true) - require.NoError(t, err) - require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode) - - rstate0 := getRaftState(ctx, t, &node0) - rstate1 := getRaftState(ctx, t, &node1) - rstate2 := getRaftState(ctx, t, &node2) - - require.EqualValues(t, rstate0, rstate1) - require.EqualValues(t, rstate0, rstate2) -} - -func TestRaftStateLeaderDisconnects(t *testing.T) { - - kit.QuietMiningLogs() - ctx := context.Background() - - var ( - node0 kit.TestFullNode - node1 kit.TestFullNode - node2 kit.TestFullNode - miner kit.TestMiner - ) - - nodes := []*kit.TestFullNode{&node0, &node1, &node2} - - setup(ctx, t, &node0, &node1, &node2, &miner) - - peerToNode := make(map[peer.ID]*kit.TestFullNode) - for _, n := range nodes { - peerToNode[n.Pkey.PeerID] = n - } - - bal, err := node0.WalletBalance(ctx, node0.DefaultKey.Address) - require.NoError(t, err) - - msgHalfBal := &types.Message{ - From: miner.OwnerKey.Address, - To: node0.DefaultKey.Address, - Value: big.Div(bal, big.NewInt(2)), - } - mu := uuid.New() - smHalfBal, err := node0.MpoolPushMessage(ctx, msgHalfBal, &api.MessageSendSpec{ - MsgUuid: mu, - }) - require.NoError(t, err) - mLookup, err := node0.StateWaitMsg(ctx, smHalfBal.Cid(), 3, api.LookbackNoLimit, true) - require.NoError(t, err) - require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode) - - rstate0 := getRaftState(ctx, t, &node0) - rstate1 := getRaftState(ctx, t, &node1) - rstate2 := getRaftState(ctx, t, &node2) - - require.True(t, reflect.DeepEqual(rstate0, rstate1)) - require.True(t, reflect.DeepEqual(rstate0, rstate2)) - - leader, err := node1.RaftLeader(ctx) - require.NoError(t, err) - leaderNode := peerToNode[leader] - - err = leaderNode.Stop(ctx) - require.NoError(t, err) - oldLeaderNode := leaderNode - - time.Sleep(5 * time.Second) - - newLeader := leader - for _, n := range nodes { - if n != leaderNode { - newLeader, err = n.RaftLeader(ctx) - require.NoError(t, err) - require.NotEqual(t, newLeader, leader) - } - } - - require.NotEqual(t, newLeader, leader) - leaderNode = peerToNode[newLeader] - - msg2 := &types.Message{ - From: miner.OwnerKey.Address, - To: leaderNode.DefaultKey.Address, - Value: big.NewInt(100000), - } - mu2 := uuid.New() - signedMsg2, err := leaderNode.MpoolPushMessage(ctx, msg2, &api.MessageSendSpec{ - MsgUuid: mu2, - }) - require.NoError(t, err) - mLookup, err = leaderNode.StateWaitMsg(ctx, signedMsg2.Cid(), 3, api.LookbackNoLimit, true) - require.NoError(t, err) - require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode) - - rstate := getRaftState(ctx, t, leaderNode) - - for _, n := range nodes { - if n != oldLeaderNode { - rs := getRaftState(ctx, t, n) - require.True(t, reflect.DeepEqual(rs, rstate)) - } - } -} - -func TestRaftStateMiner(t *testing.T) { - - kit.QuietMiningLogs() - ctx := context.Background() - - var ( - node0 kit.TestFullNode - node1 kit.TestFullNode - node2 kit.TestFullNode - miner kit.TestMiner - ) - - setup(ctx, t, &node0, &node1, &node2, &miner) - - fmt.Println(node0.WalletList(context.Background())) - fmt.Println(node1.WalletList(context.Background())) - fmt.Println(node2.WalletList(context.Background())) - - bal, err := node0.WalletBalance(ctx, node0.DefaultKey.Address) - require.NoError(t, err) - - msgHalfBal := &types.Message{ - From: miner.OwnerKey.Address, - To: node0.DefaultKey.Address, - Value: big.Div(bal, big.NewInt(2)), - } - mu := uuid.New() - smHalfBal, err := miner.FullNode.MpoolPushMessage(ctx, msgHalfBal, &api.MessageSendSpec{ - MsgUuid: mu, - }) - require.NoError(t, err) - mLookup, err := node0.StateWaitMsg(ctx, smHalfBal.Cid(), 3, api.LookbackNoLimit, true) - require.NoError(t, err) - require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode) - - rstate0 := getRaftState(ctx, t, &node0) - rstate1 := getRaftState(ctx, t, &node1) - rstate2 := getRaftState(ctx, t, &node2) - - require.EqualValues(t, rstate0, rstate1) - require.EqualValues(t, rstate0, rstate2) -} - -func TestRaftStateLeaderDisconnectsMiner(t *testing.T) { - - kit.QuietMiningLogs() - ctx := context.Background() - - var ( - node0 kit.TestFullNode - node1 kit.TestFullNode - node2 kit.TestFullNode - miner kit.TestMiner - ) - - nodes := []*kit.TestFullNode{&node0, &node1, &node2} - - setup(ctx, t, &node0, &node1, &node2, &miner) - - peerToNode := make(map[peer.ID]*kit.TestFullNode) - for _, n := range nodes { - peerToNode[n.Pkey.PeerID] = n - } - - leader, err := node0.RaftLeader(ctx) - require.NoError(t, err) - leaderNode := peerToNode[leader] - - // Take leader node down - err = leaderNode.Stop(ctx) - require.NoError(t, err) - oldLeaderNode := leaderNode - - time.Sleep(5 * time.Second) - - newLeader := leader - for _, n := range nodes { - if n != leaderNode { - newLeader, err = n.RaftLeader(ctx) - require.NoError(t, err) - require.NotEqual(t, newLeader, leader) - } - } - - require.NotEqual(t, newLeader, leader) - leaderNode = peerToNode[newLeader] - - msg2 := &types.Message{ - From: miner.OwnerKey.Address, - To: node0.DefaultKey.Address, - Value: big.NewInt(100000), - } - mu2 := uuid.New() - - signedMsg2, err := miner.FullNode.MpoolPushMessage(ctx, msg2, &api.MessageSendSpec{ - MaxFee: abi.TokenAmount(config.DefaultDefaultMaxFee), - MsgUuid: mu2, - }) - require.NoError(t, err) - - mLookup, err := leaderNode.StateWaitMsg(ctx, signedMsg2.Cid(), 3, api.LookbackNoLimit, true) - require.NoError(t, err) - require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode) - - rstate := getRaftState(ctx, t, leaderNode) - - for _, n := range nodes { - if n != oldLeaderNode { - rs := getRaftState(ctx, t, n) - require.True(t, reflect.DeepEqual(rs, rstate)) - } - } -} - -// Miner sends message on leader -// Leader disconnects -// Call StateWaitMsg on new leader -func TestLeaderDisconnectsCheckMsgStateOnNewLeader(t *testing.T) { - - kit.QuietMiningLogs() - ctx := context.Background() - - var ( - node0 kit.TestFullNode - node1 kit.TestFullNode - node2 kit.TestFullNode - miner kit.TestMiner - ) - - nodes := []*kit.TestFullNode{&node0, &node1, &node2} - - setup(ctx, t, &node0, &node1, &node2, &miner) - - peerToNode := make(map[peer.ID]*kit.TestFullNode) - for _, n := range nodes { - peerToNode[n.Pkey.PeerID] = n - } - - bal, err := node0.WalletBalance(ctx, node0.DefaultKey.Address) - require.NoError(t, err) - - msgHalfBal := &types.Message{ - From: miner.OwnerKey.Address, - To: node0.DefaultKey.Address, - Value: big.Div(bal, big.NewInt(2)), - } - mu := uuid.New() - smHalfBal, err := miner.FullNode.MpoolPushMessage(ctx, msgHalfBal, &api.MessageSendSpec{ - MsgUuid: mu, - }) - require.NoError(t, err) - - leader, err := node0.RaftLeader(ctx) - require.NoError(t, err) - leaderNode := peerToNode[leader] - - // Take leader node down - err = leaderNode.Stop(ctx) - require.NoError(t, err) - oldLeaderNode := leaderNode - - time.Sleep(5 * time.Second) - - // Check if all active nodes update their leader - newLeader := leader - for _, n := range nodes { - if n != leaderNode { - newLeader, err = n.RaftLeader(ctx) - require.NoError(t, err) - require.NotEqual(t, newLeader, leader) - } - } - - require.NotEqual(t, newLeader, leader) - leaderNode = peerToNode[newLeader] - - mLookup, err := leaderNode.StateWaitMsg(ctx, smHalfBal.Cid(), 3, api.LookbackNoLimit, true) - require.NoError(t, err) - require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode) - - rstate := getRaftState(ctx, t, leaderNode) - - // Check if Raft state is consistent on all active nodes - for _, n := range nodes { - if n != oldLeaderNode { - rs := getRaftState(ctx, t, n) - require.True(t, reflect.DeepEqual(rs, rstate)) - } - } -} - -func TestChainStoreSync(t *testing.T) { - - kit.QuietMiningLogs() - ctx := context.Background() - - var ( - node0 kit.TestFullNode - node1 kit.TestFullNode - node2 kit.TestFullNode - miner kit.TestMiner - ) - - nodes := []*kit.TestFullNode{&node0, &node1, &node2} - - setup(ctx, t, &node0, &node1, &node2, &miner) - - peerToNode := make(map[peer.ID]*kit.TestFullNode) - for _, n := range nodes { - peerToNode[n.Pkey.PeerID] = n - } - - bal, err := node0.WalletBalance(ctx, node0.DefaultKey.Address) - require.NoError(t, err) - - leader, err := node0.RaftLeader(ctx) - require.NoError(t, err) - leaderNode := peerToNode[leader] - - msgHalfBal := &types.Message{ - From: miner.OwnerKey.Address, - To: node0.DefaultKey.Address, - Value: big.Div(bal, big.NewInt(2)), - } - mu := uuid.New() - smHalfBal, err := miner.FullNode.MpoolPushMessage(ctx, msgHalfBal, &api.MessageSendSpec{ - MsgUuid: mu, - }) - require.NoError(t, err) - - for _, n := range nodes { - fmt.Println(n != leaderNode) - if n != leaderNode { - mLookup, err := n.StateWaitMsg(ctx, smHalfBal.Cid(), 3, api.LookbackNoLimit, true) - require.NoError(t, err) - require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode) - //break - } - } -} - -func TestGoRPCAuth(t *testing.T) { - // TODO Fix Raft, then enable this test. https://github.com/filecoin-project/lotus/issues/9888 - t.SkipNow() - - blockTime := 1 * time.Second - - kit.QuietMiningLogs() - ctx := context.Background() - - var ( - node0 kit.TestFullNode - node1 kit.TestFullNode - node2 kit.TestFullNode - node3 kit.TestFullNode - miner kit.TestMiner - ) - - pkey0, _ := generatePrivKey() - pkey1, _ := generatePrivKey() - pkey2, _ := generatePrivKey() - - pkeys := []*kit.Libp2p{pkey0, pkey1, pkey2} - initPeerSet := []string{} - for _, pkey := range pkeys { - initPeerSet = append(initPeerSet, "/p2p/"+pkey.PeerID.String()) - } - - raftOps := kit.ConstructorOpts( - node.Override(new(*gorpc.Client), modules.NewRPCClient), - node.Override(new(*consensus.ClusterRaftConfig), func() *consensus.ClusterRaftConfig { - cfg := consensus.DefaultClusterRaftConfig() - cfg.InitPeerset = initPeerSet - return cfg - }), - node.Override(new(*consensus.Consensus), consensus.NewConsensusWithRPCClient(false)), - node.Override(new(*messagesigner.MessageSignerConsensus), messagesigner.NewMessageSignerConsensus), - node.Override(new(messagesigner.MsgSigner), func(ms *messagesigner.MessageSignerConsensus) *messagesigner.MessageSignerConsensus { return ms }), - node.Override(new(*modules.RPCHandler), modules.NewRPCHandler), - node.Override(node.GoRPCServer, modules.NewRPCServer), - ) - //raftOps := kit.ConstructorOpts() - - ens := kit.NewEnsemble(t).FullNode(&node0, raftOps, kit.ThroughRPC()).FullNode(&node1, raftOps, kit.ThroughRPC()).FullNode(&node2, raftOps, kit.ThroughRPC()).FullNode(&node3, raftOps) - node0.AssignPrivKey(pkey0) - node1.AssignPrivKey(pkey1) - node2.AssignPrivKey(pkey2) - - nodes := []*kit.TestFullNode{&node0, &node1, &node2} - wrappedFullNode := kit.MergeFullNodes(nodes) - - ens.MinerEnroll(&miner, wrappedFullNode, kit.WithAllSubsystems(), kit.ThroughRPC()) - ens.Start() - - // Import miner wallet to all nodes - addr0, err := node0.WalletImport(ctx, &miner.OwnerKey.KeyInfo) - require.NoError(t, err) - addr1, err := node1.WalletImport(ctx, &miner.OwnerKey.KeyInfo) - require.NoError(t, err) - addr2, err := node2.WalletImport(ctx, &miner.OwnerKey.KeyInfo) - require.NoError(t, err) - - fmt.Println(addr0, addr1, addr2) - - ens.InterconnectAll() - - ens.AddInactiveMiner(&miner) - ens.Start() - - ens.InterconnectAll().BeginMining(blockTime) - - leader, err := node0.RaftLeader(ctx) - require.NoError(t, err) - - client := node3.FullNode.(*impl.FullNodeAPI).RaftAPI.MessageSigner.Consensus.RpcClient - method := "MpoolPushMessage" - - msg := &types.Message{ - From: miner.OwnerKey.Address, - To: node0.DefaultKey.Address, - Value: big.NewInt(100000), - } - msgWhole := &api.MpoolMessageWhole{Msg: msg} - var ret types.SignedMessage - - err = client.CallContext(ctx, leader, "Consensus", method, msgWhole, &ret) - require.True(t, gorpc.IsAuthorizationError(err)) - -} diff --git a/lib/consensus/raft/config.go b/lib/consensus/raft/config.go deleted file mode 100644 index bdd82c108..000000000 --- a/lib/consensus/raft/config.go +++ /dev/null @@ -1,135 +0,0 @@ -package consensus - -import ( - "io" - "path/filepath" - "time" - - hraft "github.com/hashicorp/raft" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/node/repo" -) - -// Configuration defaults -var ( - DefaultDataSubFolder = "raft-cluster" - DefaultWaitForLeaderTimeout = 15 * time.Second - DefaultCommitRetries = 1 - DefaultNetworkTimeout = 100 * time.Second - DefaultCommitRetryDelay = 200 * time.Millisecond - DefaultBackupsRotate = 6 -) - -// ClusterRaftConfig allows to configure the Raft Consensus component for the node cluster. -type ClusterRaftConfig struct { - // config to enabled node cluster with raft consensus - ClusterModeEnabled bool - // A folder to store Raft's data. - DataFolder string - // InitPeerset provides the list of initial cluster peers for new Raft - // peers (with no prior state). It is ignored when Raft was already - // initialized or when starting in staging mode. - InitPeerset []string - // LeaderTimeout specifies how long to wait for a leader before - // failing an operation. - WaitForLeaderTimeout time.Duration - // NetworkTimeout specifies how long before a Raft network - // operation is timed out - NetworkTimeout time.Duration - // CommitRetries specifies how many times we retry a failed commit until - // we give up. - CommitRetries int - // How long to wait between retries - CommitRetryDelay time.Duration - // BackupsRotate specifies the maximum number of Raft's DataFolder - // copies that we keep as backups (renaming) after cleanup. - BackupsRotate int - // A Hashicorp Raft's configuration object. - RaftConfig *hraft.Config - - // Tracing enables propagation of contexts across binary boundaries. - Tracing bool -} - -func DefaultClusterRaftConfig() *ClusterRaftConfig { - var cfg ClusterRaftConfig - cfg.DataFolder = "" // empty so it gets omitted - cfg.InitPeerset = []string{} - cfg.WaitForLeaderTimeout = DefaultWaitForLeaderTimeout - cfg.NetworkTimeout = DefaultNetworkTimeout - cfg.CommitRetries = DefaultCommitRetries - cfg.CommitRetryDelay = DefaultCommitRetryDelay - cfg.BackupsRotate = DefaultBackupsRotate - cfg.RaftConfig = hraft.DefaultConfig() - - // These options are imposed over any Default Raft Config. - cfg.RaftConfig.ShutdownOnRemove = false - cfg.RaftConfig.LocalID = "will_be_set_automatically" - - // Set up logging - cfg.RaftConfig.LogOutput = io.Discard - return &cfg -} - -func NewClusterRaftConfig(userRaftConfig *config.UserRaftConfig) *ClusterRaftConfig { - var cfg ClusterRaftConfig - cfg.DataFolder = userRaftConfig.DataFolder - cfg.InitPeerset = userRaftConfig.InitPeersetMultiAddr - cfg.WaitForLeaderTimeout = time.Duration(userRaftConfig.WaitForLeaderTimeout) - cfg.NetworkTimeout = time.Duration(userRaftConfig.NetworkTimeout) - cfg.CommitRetries = userRaftConfig.CommitRetries - cfg.CommitRetryDelay = time.Duration(userRaftConfig.CommitRetryDelay) - cfg.BackupsRotate = userRaftConfig.BackupsRotate - - // Keep this to be default hraft config for now - cfg.RaftConfig = hraft.DefaultConfig() - - // These options are imposed over any Default Raft Config. - cfg.RaftConfig.ShutdownOnRemove = false - cfg.RaftConfig.LocalID = "will_be_set_automatically" - - // Set up logging - cfg.RaftConfig.LogOutput = io.Discard - - return &cfg - -} - -// Validate checks that this configuration has working values, -// at least in appearance. -func ValidateConfig(cfg *ClusterRaftConfig) error { - if cfg.RaftConfig == nil { - return xerrors.Errorf("no hashicorp/raft.Config") - } - if cfg.WaitForLeaderTimeout <= 0 { - return xerrors.Errorf("wait_for_leader_timeout <= 0") - } - - if cfg.NetworkTimeout <= 0 { - return xerrors.Errorf("network_timeout <= 0") - } - - if cfg.CommitRetries < 0 { - return xerrors.Errorf("commit_retries is invalid") - } - - if cfg.CommitRetryDelay <= 0 { - return xerrors.Errorf("commit_retry_delay is invalid") - } - - if cfg.BackupsRotate <= 0 { - return xerrors.Errorf("backups_rotate should be larger than 0") - } - - return hraft.ValidateConfig(cfg.RaftConfig) -} - -// GetDataFolder returns the Raft data folder that we are using. -func (cfg *ClusterRaftConfig) GetDataFolder(repo repo.LockedRepo) string { - if cfg.DataFolder == "" { - return filepath.Join(repo.Path(), DefaultDataSubFolder) - } - return filepath.Join(repo.Path(), cfg.DataFolder) -} diff --git a/lib/consensus/raft/consensus.go b/lib/consensus/raft/consensus.go deleted file mode 100644 index d74f200fa..000000000 --- a/lib/consensus/raft/consensus.go +++ /dev/null @@ -1,512 +0,0 @@ -// Package raft implements a Consensus component for IPFS Cluster which uses -// Raft (go-libp2p-raft). -package consensus - -import ( - "bytes" - "context" - "errors" - "fmt" - "sort" - "time" - - "github.com/google/uuid" - "golang.org/x/exp/slices" - - addr "github.com/filecoin-project/go-address" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/messagepool" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/addrutil" - "github.com/filecoin-project/lotus/node/repo" - - //ds "github.com/ipfs/go-datastore" - logging "github.com/ipfs/go-log/v2" - consensus "github.com/libp2p/go-libp2p-consensus" - rpc "github.com/libp2p/go-libp2p-gorpc" - libp2praft "github.com/libp2p/go-libp2p-raft" - host "github.com/libp2p/go-libp2p/core/host" - peer "github.com/libp2p/go-libp2p/core/peer" -) - -var logger = logging.Logger("raft") - -type RaftState struct { - NonceMap api.NonceMapType - MsgUuids api.MsgUuidMapType - - // TODO: add comment explaining why this is needed - // We need a reference to the messagepool in the raft state in order to - // sync messages that have been sent by the leader node - // Miner calls StateWaitMsg after MpoolPushMessage to check if the message has - // landed on chain. This check requires the message be stored in the local chainstore - // If a leadernode goes down after sending a message to the chain and is replaced by - // another node, the other node needs to have this message in its chainstore for the - // above check to succeed. - - // This is because the miner only stores signed CIDs but the message received from in a - // block will be unsigned (for BLS). Hence, the process relies on the node to store the - // signed message which holds a copy of the unsigned message to properly perform all the - // needed checks - Mpool *messagepool.MessagePool -} - -func newRaftState(mpool *messagepool.MessagePool) *RaftState { - return &RaftState{ - NonceMap: make(map[addr.Address]uint64), - MsgUuids: make(map[uuid.UUID]*types.SignedMessage), - Mpool: mpool, - } -} - -type ConsensusOp struct { - Nonce uint64 `codec:"nonce,omitempty"` - Uuid uuid.UUID `codec:"uuid,omitempty"` - Addr addr.Address `codec:"addr,omitempty"` - SignedMsg *types.SignedMessage `codec:"signedMsg,omitempty"` -} - -func (c ConsensusOp) ApplyTo(state consensus.State) (consensus.State, error) { - s := state.(*RaftState) - s.NonceMap[c.Addr] = c.Nonce - if c.SignedMsg != nil { - - // Deep copy to tmp - var buffer bytes.Buffer - err := c.SignedMsg.MarshalCBOR(&buffer) - if err != nil { - return nil, err - } - tmp, err := types.DecodeSignedMessage(buffer.Bytes()) - if err != nil { - return nil, err - } - s.MsgUuids[c.Uuid] = tmp - - _, err = s.Mpool.Push(context.TODO(), tmp, false) - // Since this is only meant to keep messages in sync, ignore any error which - // shows the message already exists in the mpool - if err != nil && !api.ErrorIsIn(err, []error{messagepool.ErrExistingNonce}) { - return nil, err - } - } - - return s, nil -} - -var _ consensus.Op = &ConsensusOp{} - -// Consensus handles the work of keeping a shared-state between -// the peers of a Lotus Cluster, as well as modifying that state and -// applying any updates in a thread-safe manner. -type Consensus struct { - ctx context.Context - cancel func() - config *ClusterRaftConfig - - host host.Host - - consensus consensus.OpLogConsensus - actor consensus.Actor - raft *raftWrapper - state *RaftState - - RpcClient *rpc.Client - rpcReady chan struct{} - readyCh chan struct{} - - peerSet []peer.ID - repo repo.LockedRepo -} - -// NewConsensus builds a new ClusterConsensus component using Raft. -// -// Raft saves state snapshots regularly and persists log data in a bolt -// datastore. Therefore, unless memory usage is a concern, it is recommended -// to use an in-memory go-datastore as store parameter. -// -// The staging parameter controls if the Raft peer should start in -// staging mode (used when joining a new Raft peerset with other peers). -func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.MessagePool, repo repo.LockedRepo, staging bool) (*Consensus, error) { - err := ValidateConfig(cfg) - if err != nil { - return nil, err - } - - ctx, cancel := context.WithCancel(context.Background()) - - logger.Debug("starting Consensus and waiting for a leader...") - state := newRaftState(mpool) - - consensus := libp2praft.NewOpLog(state, &ConsensusOp{}) - - raft, err := newRaftWrapper(host, cfg, consensus.FSM(), repo, staging) - if err != nil { - logger.Error("error creating raft: ", err) - cancel() - return nil, err - } - actor := libp2praft.NewActor(raft.raft) - consensus.SetActor(actor) - - peers := []peer.ID{} - addrInfos, err := addrutil.ParseAddresses(ctx, cfg.InitPeerset) - if err != nil { - logger.Error("error parsing addresses: ", err) - cancel() - return nil, err - } - - for _, addrInfo := range addrInfos { - peers = append(peers, addrInfo.ID) - - // Add peer to address book - host.Peerstore().AddAddrs(addrInfo.ID, addrInfo.Addrs, time.Hour*100) - } - - cc := &Consensus{ - ctx: ctx, - cancel: cancel, - config: cfg, - host: host, - consensus: consensus, - actor: actor, - state: state, - raft: raft, - peerSet: peers, - rpcReady: make(chan struct{}, 1), - readyCh: make(chan struct{}, 1), - repo: repo, - } - - go cc.finishBootstrap() - return cc, nil - -} - -// TODO: Merge with NewConsensus and remove the rpcReady chan -func NewConsensusWithRPCClient(staging bool) func(host host.Host, - cfg *ClusterRaftConfig, - rpcClient *rpc.Client, - mpool *messagepool.MessagePool, - repo repo.LockedRepo, -) (*Consensus, error) { - - return func(host host.Host, cfg *ClusterRaftConfig, rpcClient *rpc.Client, mpool *messagepool.MessagePool, repo repo.LockedRepo) (*Consensus, error) { - cc, err := NewConsensus(host, cfg, mpool, repo, staging) - if err != nil { - return nil, err - } - cc.RpcClient = rpcClient - cc.rpcReady <- struct{}{} - return cc, nil - } -} - -// WaitForSync waits for a leader and for the state to be up to date, then returns. -func (cc *Consensus) WaitForSync(ctx context.Context) error { - - leaderCtx, cancel := context.WithTimeout(ctx, cc.config.WaitForLeaderTimeout) - defer cancel() - - // 1 - wait for leader - // 2 - wait until we are a Voter - // 3 - wait until last index is applied - - // From raft docs: - - // once a staging server receives enough log entries to be sufficiently - // caught up to the leader's log, the leader will invoke a membership - // change to change the Staging server to a Voter - - // Thus, waiting to be a Voter is a guarantee that we have a reasonable - // up to date state. Otherwise, we might return too early (see - // https://github.com/ipfs-cluster/ipfs-cluster/issues/378) - - _, err := cc.raft.WaitForLeader(leaderCtx) - if err != nil { - return errors.New("error waiting for leader: " + err.Error()) - } - - err = cc.raft.WaitForVoter(ctx) - if err != nil { - return errors.New("error waiting to become a Voter: " + err.Error()) - } - - err = cc.raft.WaitForUpdates(ctx) - if err != nil { - return errors.New("error waiting for consensus updates: " + err.Error()) - } - return nil -} - -// waits until there is a consensus leader and syncs the state -// to the tracker. If errors happen, this will return and never -// signal the component as Ready. -func (cc *Consensus) finishBootstrap() { - // wait until we have RPC to perform any actions. - select { - case <-cc.ctx.Done(): - return - case <-cc.rpcReady: - } - - // Sometimes bootstrap is a no-Op. It only applies when - // no state exists and staging=false. - _, err := cc.raft.Bootstrap() - if err != nil { - return - } - - logger.Debugf("Bootstrap finished") - err = cc.WaitForSync(cc.ctx) - if err != nil { - return - } - logger.Debug("Raft state is now up to date") - logger.Debug("consensus ready") - cc.readyCh <- struct{}{} -} - -// Shutdown stops the component so it will not process any -// more updates. The underlying consensus is permanently -// shutdown, along with the libp2p transport. -func (cc *Consensus) Shutdown(ctx context.Context) error { - - logger.Info("stopping Consensus component") - - // Raft Shutdown - err := cc.raft.Shutdown(ctx) - if err != nil { - logger.Error(err) - } - - cc.cancel() - close(cc.rpcReady) - return nil -} - -// Ready returns a channel which is signaled when the Consensus -// algorithm has finished bootstrapping and is ready to use -func (cc *Consensus) Ready(ctx context.Context) <-chan struct{} { - return cc.readyCh -} - -// IsTrustedPeer returns true. In Raft we trust all peers. -func (cc *Consensus) IsTrustedPeer(ctx context.Context, p peer.ID) bool { - return slices.Contains(cc.peerSet, p) -} - -// Trust is a no-Op. -func (cc *Consensus) Trust(ctx context.Context, pid peer.ID) error { return nil } - -// Distrust is a no-Op. -func (cc *Consensus) Distrust(ctx context.Context, pid peer.ID) error { return nil } - -// returns true if the operation was redirected to the leader -// note that if the leader just dissappeared, the rpc call will -// fail because we haven't heard that it's gone. -func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interface{}) (bool, error) { - ctx := cc.ctx - - var finalErr error - - // Retry redirects - for i := 0; i <= cc.config.CommitRetries; i++ { - logger.Debugf("redirect try %d", i) - leader, err := cc.Leader(ctx) - - // No leader, wait for one - if err != nil { - logger.Warn("there seems to be no leader. Waiting for one") - rctx, cancel := context.WithTimeout(ctx, cc.config.WaitForLeaderTimeout) - defer cancel() - pidstr, err := cc.raft.WaitForLeader(rctx) - - // means we timed out waiting for a leader - // we don't retry in this case - if err != nil { - return false, fmt.Errorf("timed out waiting for leader: %s", err) - } - leader, err = peer.Decode(pidstr) - if err != nil { - return false, err - } - } - - logger.Infof("leader: %s, curr host: %s, peerSet: %s", leader, cc.host.ID(), cc.peerSet) - - // We are the leader. Do not redirect - if leader == cc.host.ID() { - return false, nil - } - - logger.Debugf("redirecting %s to leader: %s", method, leader) - finalErr = cc.RpcClient.CallContext( - ctx, - leader, - "Consensus", - method, - arg, - ret, - ) - if finalErr != nil { - logger.Errorf("retrying to redirect request to leader: %s", finalErr) - time.Sleep(2 * cc.config.RaftConfig.HeartbeatTimeout) - continue - } - break - } - - // We tried to redirect, but something happened - return true, finalErr -} - -// commit submits a cc.consensus commit. It retries upon failures. -func (cc *Consensus) Commit(ctx context.Context, op *ConsensusOp) error { - - var finalErr error - for i := 0; i <= cc.config.CommitRetries; i++ { - logger.Debugf("attempt #%d: committing %+v", i, op) - - // this means we are retrying - if finalErr != nil { - logger.Errorf("retrying upon failed commit (retry %d): %s ", - i, finalErr) - } - - // Being here means we are the LEADER. We can commit. - // now commit the changes to our state - _, finalErr = cc.consensus.CommitOp(op) - if finalErr != nil { - goto RETRY - } - - RETRY: - time.Sleep(cc.config.CommitRetryDelay) - } - return finalErr -} - -// AddPeer adds a new peer to participate in this consensus. It will -// forward the operation to the leader if this is not it. -func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error { - var finalErr error - for i := 0; i <= cc.config.CommitRetries; i++ { - logger.Debugf("attempt #%d: AddPeer %s", i, pid) - if finalErr != nil { - logger.Errorf("retrying to add peer. Attempt #%d failed: %s", i, finalErr) - } - ok, err := cc.RedirectToLeader("AddPeer", pid, struct{}{}) - if err != nil || ok { - return err - } - // Being here means we are the leader and can commit - finalErr = cc.raft.AddPeer(ctx, pid) - if finalErr != nil { - time.Sleep(cc.config.CommitRetryDelay) - continue - } - logger.Infof("peer added to Raft: %s", pid) - break - } - return finalErr -} - -// RmPeer removes a peer from this consensus. It will -// forward the operation to the leader if this is not it. -func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error { - var finalErr error - for i := 0; i <= cc.config.CommitRetries; i++ { - logger.Debugf("attempt #%d: RmPeer %s", i, pid) - if finalErr != nil { - logger.Errorf("retrying to remove peer. Attempt #%d failed: %s", i, finalErr) - } - ok, err := cc.RedirectToLeader("RmPeer", pid, struct{}{}) - if err != nil || ok { - return err - } - // Being here means we are the leader and can commit - finalErr = cc.raft.RemovePeer(ctx, pid.String()) - if finalErr != nil { - time.Sleep(cc.config.CommitRetryDelay) - continue - } - logger.Infof("peer removed from Raft: %s", pid) - break - } - return finalErr -} - -// RaftState retrieves the current consensus RaftState. It may error if no RaftState has -// been agreed upon or the state is not consistent. The returned RaftState is the -// last agreed-upon RaftState known by this node. No writes are allowed, as all -// writes to the shared state should happen through the Consensus component -// methods. -func (cc *Consensus) State(ctx context.Context) (*RaftState, error) { - st, err := cc.consensus.GetLogHead() - if err == libp2praft.ErrNoState { - return newRaftState(nil), nil - } - - if err != nil { - return nil, err - } - state, ok := st.(*RaftState) - if !ok { - return nil, errors.New("wrong state type") - } - return state, nil -} - -// Leader returns the peerID of the Leader of the -// cluster. It returns an error when there is no leader. -func (cc *Consensus) Leader(ctx context.Context) (peer.ID, error) { - // Note the hard-dependency on raft here... - raftactor := cc.actor.(*libp2praft.Actor) - return raftactor.Leader() -} - -// Clean removes the Raft persisted state. -func (cc *Consensus) Clean(ctx context.Context) error { - //return CleanupRaft(cc.config) - return nil -} - -//Rollback replaces the current agreed-upon -//state with the state provided. Only the consensus leader -//can perform this operation. -//func (cc *Consensus) Rollback(state RaftState) error { -// // This is unused. It *might* be used for upgrades. -// // There is rather untested magic in libp2p-raft's FSM() -// // to make this possible. -// return cc.consensus.Rollback(state) -//} - -// Peers return the current list of peers in the consensus. -// The list will be sorted alphabetically. -func (cc *Consensus) Peers(ctx context.Context) ([]peer.ID, error) { - - peers := []peer.ID{} - raftPeers, err := cc.raft.Peers(ctx) - if err != nil { - return nil, fmt.Errorf("cannot retrieve list of peers: %s", err) - } - - sort.Strings(raftPeers) - - for _, p := range raftPeers { - id, err := peer.Decode(p) - if err != nil { - panic("could not decode peer") - } - peers = append(peers, id) - } - return peers, nil -} - -func (cc *Consensus) IsLeader(ctx context.Context) bool { - leader, _ := cc.Leader(ctx) - return leader == cc.host.ID() -} diff --git a/lib/consensus/raft/interfaces.go b/lib/consensus/raft/interfaces.go deleted file mode 100644 index 2b77d1ebe..000000000 --- a/lib/consensus/raft/interfaces.go +++ /dev/null @@ -1,41 +0,0 @@ -package consensus - -import ( - "context" - - consensus "github.com/libp2p/go-libp2p-consensus" - "github.com/libp2p/go-libp2p/core/peer" -) - -type ConsensusAPI interface { - // Returns a channel to signal that the consensus layer is ready - // allowing the main component to wait for it during start. - Ready(context.Context) <-chan struct{} - - AddPeer(context.Context, peer.ID) error - RmPeer(context.Context, peer.ID) error - State(context.Context) (consensus.State, error) - // Provide a node which is responsible to perform - // specific tasks which must only run in 1 cluster peer. - Leader(context.Context) (peer.ID, error) - // Only returns when the consensus state has all log - // updates applied to it. - WaitForSync(context.Context) error - // Clean removes all consensus data. - Clean(context.Context) error - // Peers returns the peerset participating in the Consensus. - Peers(context.Context) ([]peer.ID, error) - // IsTrustedPeer returns true if the given peer is "trusted". - // This will grant access to more rpc endpoints and a - // non-trusted one. This should be fast as it will be - // called repeatedly for every remote RPC request. - IsTrustedPeer(context.Context, peer.ID) bool - // Trust marks a peer as "trusted". - Trust(context.Context, peer.ID) error - // Distrust removes a peer from the "trusted" set. - Distrust(context.Context, peer.ID) error - // Returns true if current node is the cluster leader - IsLeader(ctx context.Context) bool - - Shutdown(context.Context) error -} diff --git a/lib/consensus/raft/raft.go b/lib/consensus/raft/raft.go deleted file mode 100644 index 8541e6f87..000000000 --- a/lib/consensus/raft/raft.go +++ /dev/null @@ -1,563 +0,0 @@ -package consensus - -import ( - "context" - "errors" - "fmt" - "os" - "path/filepath" - "time" - - "github.com/hashicorp/go-hclog" - hraft "github.com/hashicorp/raft" - raftboltdb "github.com/hashicorp/raft-boltdb" - "github.com/ipfs/go-log/v2" - p2praft "github.com/libp2p/go-libp2p-raft" - host "github.com/libp2p/go-libp2p/core/host" - peer "github.com/libp2p/go-libp2p/core/peer" - "go.uber.org/multierr" - "go.uber.org/zap" - - "github.com/filecoin-project/lotus/lib/addrutil" - "github.com/filecoin-project/lotus/node/repo" -) - -var raftLogger = log.Logger("raft-cluster") - -// ErrWaitingForSelf is returned when we are waiting for ourselves to depart -// the peer set, which won't happen -var errWaitingForSelf = errors.New("waiting for ourselves to depart") - -// RaftMaxSnapshots indicates how many snapshots to keep in the consensus data -// folder. -// TODO: Maybe include this in Config. Not sure how useful it is to touch -// this anyways. -var RaftMaxSnapshots = 5 - -// RaftLogCacheSize is the maximum number of logs to cache in-memory. -// This is used to reduce disk I/O for the recently committed entries. -var RaftLogCacheSize = 512 - -// How long we wait for updates during shutdown before snapshotting -var waitForUpdatesShutdownTimeout = 5 * time.Second -var waitForUpdatesInterval = 400 * time.Millisecond - -// How many times to retry snapshotting when shutting down -var maxShutdownSnapshotRetries = 5 - -// raftWrapper wraps the hraft.Raft object and related things like the -// different stores used or the hraft.Configuration. -// Its methods provide functionality for working with Raft. -type raftWrapper struct { - ctx context.Context - cancel context.CancelFunc - raft *hraft.Raft - config *ClusterRaftConfig - host host.Host - serverConfig hraft.Configuration - transport *hraft.NetworkTransport - snapshotStore hraft.SnapshotStore - logStore hraft.LogStore - stableStore hraft.StableStore - boltdb *raftboltdb.BoltStore - repo repo.LockedRepo - staging bool -} - -// newRaftWrapper creates a Raft instance and initializes -// everything leaving it ready to use. Note, that Bootstrap() should be called -// to make sure the raft instance is usable. -func newRaftWrapper( - host host.Host, - cfg *ClusterRaftConfig, - fsm hraft.FSM, - repo repo.LockedRepo, - staging bool, -) (*raftWrapper, error) { - - raftW := &raftWrapper{} - raftW.config = cfg - raftW.host = host - raftW.staging = staging - raftW.repo = repo - // Set correct LocalID - cfg.RaftConfig.LocalID = hraft.ServerID(host.ID().String()) - - df := cfg.GetDataFolder(repo) - err := makeDataFolder(df) - if err != nil { - return nil, err - } - - err = raftW.makeServerConfig() - if err != nil { - return nil, err - } - - err = raftW.makeTransport() - if err != nil { - return nil, err - } - - err = raftW.makeStores() - if err != nil { - return nil, err - } - - raftLogger.Debug("creating Raft") - raftW.raft, err = hraft.NewRaft( - cfg.RaftConfig, - fsm, - raftW.logStore, - raftW.stableStore, - raftW.snapshotStore, - raftW.transport, - ) - if err != nil { - raftLogger.Error("initializing raft: ", err) - return nil, err - } - - raftW.ctx, raftW.cancel = context.WithCancel(context.Background()) - - return raftW, nil -} - -// makeDataFolder creates the folder that is meant to store Raft data. Ensures -// we always set 0700 mode. -func makeDataFolder(folder string) error { - return os.MkdirAll(folder, 0700) -} - -func (rw *raftWrapper) makeTransport() (err error) { - raftLogger.Debug("creating libp2p Raft transport") - rw.transport, err = p2praft.NewLibp2pTransport( - rw.host, - rw.config.NetworkTimeout, - ) - return err -} - -func (rw *raftWrapper) makeStores() error { - raftLogger.Debug("creating BoltDB store") - df := rw.config.GetDataFolder(rw.repo) - store, err := raftboltdb.NewBoltStore(filepath.Join(df, "raft.db")) - if err != nil { - return err - } - - // wraps the store in a LogCache to improve performance. - // See consul/agent/consul/server.go - cacheStore, err := hraft.NewLogCache(RaftLogCacheSize, store) - if err != nil { - return err - } - - raftLogger.Debug("creating raft snapshot store") - snapstore, err := hraft.NewFileSnapshotStoreWithLogger( - df, - RaftMaxSnapshots, - hclog.FromStandardLogger(zap.NewStdLog(log.Logger("raft-snapshot").SugaredLogger.Desugar()), hclog.DefaultOptions), - ) - if err != nil { - return err - } - - rw.logStore = cacheStore - rw.stableStore = store - rw.snapshotStore = snapstore - rw.boltdb = store - return nil -} - -// Bootstrap calls BootstrapCluster on the Raft instance with a valid -// Configuration (generated from InitPeerset) when Raft has no state -// and we are not setting up a staging peer. It returns if Raft -// was boostrapped (true) and an error. -func (rw *raftWrapper) Bootstrap() (bool, error) { - logger.Debug("checking for existing raft states") - hasState, err := hraft.HasExistingState( - rw.logStore, - rw.stableStore, - rw.snapshotStore, - ) - if err != nil { - return false, err - } - - if hasState { - logger.Debug("raft cluster is already initialized") - - // Inform the user that we are working with a pre-existing peerset - logger.Info("existing Raft state found! raft.InitPeerset will be ignored") - cf := rw.raft.GetConfiguration() - if err := cf.Error(); err != nil { - logger.Debug(err) - return false, err - } - currentCfg := cf.Configuration() - srvs := "" - for _, s := range currentCfg.Servers { - srvs += fmt.Sprintf(" %s\n", s.ID) - } - - logger.Debugf("Current Raft Peerset:\n%s\n", srvs) - return false, nil - } - - if rw.staging { - logger.Debug("staging servers do not need initialization") - logger.Info("peer is ready to join a cluster") - return false, nil - } - - voters := "" - for _, s := range rw.serverConfig.Servers { - voters += fmt.Sprintf(" %s\n", s.ID) - } - - logger.Infof("initializing raft cluster with the following voters:\n%s\n", voters) - - future := rw.raft.BootstrapCluster(rw.serverConfig) - if err := future.Error(); err != nil { - logger.Error("bootstrapping cluster: ", err) - return true, err - } - return true, nil -} - -// create Raft servers configuration. The result is used -// by Bootstrap() when it proceeds to Bootstrap. -func (rw *raftWrapper) makeServerConfig() error { - peers := []peer.ID{} - addrInfos, err := addrutil.ParseAddresses(context.Background(), rw.config.InitPeerset) - if err != nil { - return err - } - for _, addrInfo := range addrInfos { - peers = append(peers, addrInfo.ID) - } - rw.serverConfig = makeServerConf(append(peers, rw.host.ID())) - return nil -} - -// creates a server configuration with all peers as Voters. -func makeServerConf(peers []peer.ID) hraft.Configuration { - sm := make(map[string]struct{}) - - servers := make([]hraft.Server, 0) - - // Servers are peers + self. We avoid duplicate entries below - for _, pid := range peers { - p := pid.String() - _, ok := sm[p] - if !ok { // avoid dups - sm[p] = struct{}{} - servers = append(servers, hraft.Server{ - Suffrage: hraft.Voter, - ID: hraft.ServerID(p), - Address: hraft.ServerAddress(p), - }) - } - } - return hraft.Configuration{Servers: servers} -} - -// WaitForLeader holds until Raft says we have a leader. -// Returns if ctx is canceled. -func (rw *raftWrapper) WaitForLeader(ctx context.Context) (string, error) { - ticker := time.NewTicker(time.Second / 2) - for { - select { - case <-ticker.C: - if l := rw.raft.Leader(); l != "" { - logger.Debug("waitForleaderTimer") - logger.Infof("Current Raft Leader: %s", l) - ticker.Stop() - return string(l), nil - } - case <-ctx.Done(): - return "", ctx.Err() - } - } -} - -func (rw *raftWrapper) WaitForVoter(ctx context.Context) error { - logger.Debug("waiting until we are promoted to a voter") - - pid := hraft.ServerID(rw.host.ID().String()) - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - logger.Debugf("%s: get configuration", pid) - configFuture := rw.raft.GetConfiguration() - if err := configFuture.Error(); err != nil { - return err - } - - if isVoter(pid, configFuture.Configuration()) { - return nil - } - logger.Debugf("%s: not voter yet", pid) - - time.Sleep(waitForUpdatesInterval) - } - } -} - -func isVoter(srvID hraft.ServerID, cfg hraft.Configuration) bool { - for _, server := range cfg.Servers { - if server.ID == srvID && server.Suffrage == hraft.Voter { - return true - } - } - return false -} - -// WaitForUpdates holds until Raft has synced to the last index in the log -func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error { - - logger.Debug("Raft state is catching up to the latest known version. Please wait...") - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - lai := rw.raft.AppliedIndex() - li := rw.raft.LastIndex() - logger.Debugf("current Raft index: %d/%d", - lai, li) - if lai == li { - return nil - } - time.Sleep(waitForUpdatesInterval) - } - } -} - -func (rw *raftWrapper) WaitForPeer(ctx context.Context, pid string, depart bool) error { - - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - peers, err := rw.Peers(ctx) - if err != nil { - return err - } - - if len(peers) == 1 && pid == peers[0] && depart { - return errWaitingForSelf - } - - found := find(peers, pid) - - // departing - if depart && !found { - return nil - } - - // joining - if !depart && found { - return nil - } - - time.Sleep(50 * time.Millisecond) - } - } -} - -// Snapshot tells Raft to take a snapshot. -func (rw *raftWrapper) Snapshot() error { - future := rw.raft.Snapshot() - err := future.Error() - if err != nil && err.Error() != hraft.ErrNothingNewToSnapshot.Error() { - return err - } - return nil -} - -// snapshotOnShutdown attempts to take a snapshot before a shutdown. -// Snapshotting might fail if the raft applied index is not the last index. -// This waits for the updates and tries to take a snapshot when the -// applied index is up to date. -// It will retry if the snapshot still fails, in case more updates have arrived. -// If waiting for updates times-out, it will not try anymore, since something -// is wrong. This is a best-effort solution as there is no way to tell Raft -// to stop processing entries because we want to take a snapshot before -// shutting down. -func (rw *raftWrapper) snapshotOnShutdown() error { - var err error - for i := 0; i < maxShutdownSnapshotRetries; i++ { - ctx, cancel := context.WithTimeout(context.Background(), waitForUpdatesShutdownTimeout) - err = rw.WaitForUpdates(ctx) - cancel() - if err != nil { - logger.Warn("timed out waiting for state updates before shutdown. Snapshotting may fail") - return rw.Snapshot() - } - - err = rw.Snapshot() - if err == nil { - return nil // things worked - } - - // There was an error - err = errors.New("could not snapshot raft: " + err.Error()) - logger.Warnf("retrying to snapshot (%d/%d)...", i+1, maxShutdownSnapshotRetries) - } - return err -} - -// Shutdown shutdown Raft and closes the BoltDB. -func (rw *raftWrapper) Shutdown(ctx context.Context) error { - - rw.cancel() - - var finalErr error - - err := rw.snapshotOnShutdown() - if err != nil { - finalErr = multierr.Append(finalErr, err) - } - - future := rw.raft.Shutdown() - err = future.Error() - if err != nil { - finalErr = multierr.Append(finalErr, err) - } - - err = rw.boltdb.Close() // important! - if err != nil { - finalErr = multierr.Append(finalErr, err) - } - - return finalErr -} - -// AddPeer adds a peer to Raft -func (rw *raftWrapper) AddPeer(ctx context.Context, peerId peer.ID) error { - - // Check that we don't have it to not waste - // log entries if so. - peers, err := rw.Peers(ctx) - if err != nil { - return err - } - if find(peers, peerId.String()) { - logger.Infof("%s is already a raft peerStr", peerId.String()) - return nil - } - - err = rw.host.Connect(ctx, peer.AddrInfo{ID: peerId}) - if err != nil { - return err - } - - future := rw.raft.AddVoter( - hraft.ServerID(peerId.String()), - hraft.ServerAddress(peerId.String()), - 0, - 0, - ) // TODO: Extra cfg value? - err = future.Error() - if err != nil { - logger.Error("raft cannot add peer: ", err) - } - return err -} - -// RemovePeer removes a peer from Raft -func (rw *raftWrapper) RemovePeer(ctx context.Context, peer string) error { - // Check that we have it to not waste - // log entries if we don't. - peers, err := rw.Peers(ctx) - if err != nil { - return err - } - if !find(peers, peer) { - logger.Infof("%s is not among raft peers", peer) - return nil - } - - if len(peers) == 1 && peers[0] == peer { - return errors.New("cannot remove ourselves from a 1-peer cluster") - } - - rmFuture := rw.raft.RemoveServer( - hraft.ServerID(peer), - 0, - 0, - ) - err = rmFuture.Error() - if err != nil { - logger.Error("raft cannot remove peer: ", err) - return err - } - - return nil -} - -// Leader returns Raft's leader. It may be an empty string if -// there is no leader or it is unknown. -func (rw *raftWrapper) Leader(ctx context.Context) string { - return string(rw.raft.Leader()) -} - -func (rw *raftWrapper) Peers(ctx context.Context) ([]string, error) { - ids := make([]string, 0) - - configFuture := rw.raft.GetConfiguration() - if err := configFuture.Error(); err != nil { - return nil, err - } - - for _, server := range configFuture.Configuration().Servers { - ids = append(ids, string(server.ID)) - } - - return ids, nil -} - -// CleanupRaft moves the current data folder to a backup location -//func CleanupRaft(cfg *Config) error { -// dataFolder := cfg.GetDataFolder() -// keep := cfg.BackupsRotate -// -// meta, _, err := latestSnapshot(dataFolder) -// if meta == nil && err == nil { -// // no snapshots at all. Avoid creating backups -// // from empty state folders. -// logger.Infof("cleaning empty Raft data folder (%s)", dataFolder) -// os.RemoveAll(dataFolder) -// return nil -// } -// -// logger.Infof("cleaning and backing up Raft data folder (%s)", dataFolder) -// dbh := newDataBackupHelper(dataFolder, keep) -// err = dbh.makeBackup() -// if err != nil { -// logger.Warn(err) -// logger.Warn("the state could not be cleaned properly") -// logger.Warn("manual intervention may be needed before starting cluster again") -// } -// return nil -//} - -// only call when Raft is shutdown -func (rw *raftWrapper) Clean() error { - //return CleanupRaft(rw.config) - return nil -} - -func find(s []string, elem string) bool { - for _, selem := range s { - if selem == elem { - return true - } - } - return false -} diff --git a/node/builder.go b/node/builder.go index 128a99f87..1cd4823d5 100644 --- a/node/builder.go +++ b/node/builder.go @@ -127,7 +127,6 @@ const ( SettlePaymentChannelsKey RunPeerTaggerKey SetupFallbackBlockstoresKey - GoRPCServer ConsensusReporterKey diff --git a/node/builder_chain.go b/node/builder_chain.go index 267659f00..348916010 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -3,7 +3,6 @@ package node import ( "os" - gorpc "github.com/libp2p/go-libp2p-gorpc" "go.uber.org/fx" "golang.org/x/xerrors" @@ -31,7 +30,6 @@ import ( "github.com/filecoin-project/lotus/chain/wallet" ledgerwallet "github.com/filecoin-project/lotus/chain/wallet/ledger" "github.com/filecoin-project/lotus/chain/wallet/remotewallet" - raftcns "github.com/filecoin-project/lotus/lib/consensus/raft" "github.com/filecoin-project/lotus/lib/peermgr" "github.com/filecoin-project/lotus/markets/retrievaladapter" "github.com/filecoin-project/lotus/markets/storageadapter" @@ -251,17 +249,6 @@ func ConfigFullNode(c interface{}) Option { Override(new(wallet.Default), wallet.NilDefault), ), - // Chain node cluster enabled - If(cfg.Cluster.ClusterModeEnabled, - Override(new(*gorpc.Client), modules.NewRPCClient), - Override(new(*raftcns.ClusterRaftConfig), raftcns.NewClusterRaftConfig(&cfg.Cluster)), - Override(new(*raftcns.Consensus), raftcns.NewConsensusWithRPCClient(false)), - Override(new(*messagesigner.MessageSignerConsensus), messagesigner.NewMessageSignerConsensus), - Override(new(messagesigner.MsgSigner), From(new(*messagesigner.MessageSignerConsensus))), - Override(new(*modules.RPCHandler), modules.NewRPCHandler), - Override(GoRPCServer, modules.NewRPCServer), - ), - // Actor event filtering support Override(new(events.EventAPI), From(new(modules.EventAPI))), diff --git a/node/config/def.go b/node/config/def.go index 47c0df98f..d6e38d263 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -106,7 +106,6 @@ func DefaultFullNode() *FullNode { HotstoreMaxSpaceSafetyBuffer: 50_000_000_000, }, }, - Cluster: *DefaultUserRaftConfig(), Fevm: FevmConfig{ EnableEthRPC: false, EthTxHashMappingLifetimeDays: 0, @@ -318,25 +317,3 @@ const ( // worker. The scheduler may assign any task to this worker. ResourceFilteringDisabled = ResourceFilteringStrategy("disabled") ) - -var ( - DefaultDataSubFolder = "raft" - DefaultWaitForLeaderTimeout = 15 * time.Second - DefaultCommitRetries = 1 - DefaultNetworkTimeout = 100 * time.Second - DefaultCommitRetryDelay = 200 * time.Millisecond - DefaultBackupsRotate = 6 -) - -func DefaultUserRaftConfig() *UserRaftConfig { - var cfg UserRaftConfig - cfg.DataFolder = "" // empty so it gets omitted - cfg.InitPeersetMultiAddr = []string{} - cfg.WaitForLeaderTimeout = Duration(DefaultWaitForLeaderTimeout) - cfg.NetworkTimeout = Duration(DefaultNetworkTimeout) - cfg.CommitRetries = DefaultCommitRetries - cfg.CommitRetryDelay = Duration(DefaultCommitRetryDelay) - cfg.BackupsRotate = DefaultBackupsRotate - - return &cfg -} diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 5a5bd4a8c..319e23854 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -478,12 +478,6 @@ Set to 0 to keep all mappings`, Comment: ``, }, - { - Name: "Cluster", - Type: "UserRaftConfig", - - Comment: ``, - }, { Name: "Fevm", Type: "FevmConfig", @@ -1428,68 +1422,6 @@ HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer`, Comment: ``, }, }, - "UserRaftConfig": []DocField{ - { - Name: "ClusterModeEnabled", - Type: "bool", - - Comment: `EXPERIMENTAL. config to enabled node cluster with raft consensus`, - }, - { - Name: "DataFolder", - Type: "string", - - Comment: `A folder to store Raft's data.`, - }, - { - Name: "InitPeersetMultiAddr", - Type: "[]string", - - Comment: `InitPeersetMultiAddr provides the list of initial cluster peers for new Raft -peers (with no prior state). It is ignored when Raft was already -initialized or when starting in staging mode.`, - }, - { - Name: "WaitForLeaderTimeout", - Type: "Duration", - - Comment: `LeaderTimeout specifies how long to wait for a leader before -failing an operation.`, - }, - { - Name: "NetworkTimeout", - Type: "Duration", - - Comment: `NetworkTimeout specifies how long before a Raft network -operation is timed out`, - }, - { - Name: "CommitRetries", - Type: "int", - - Comment: `CommitRetries specifies how many times we retry a failed commit until -we give up.`, - }, - { - Name: "CommitRetryDelay", - Type: "Duration", - - Comment: `How long to wait between retries`, - }, - { - Name: "BackupsRotate", - Type: "int", - - Comment: `BackupsRotate specifies the maximum number of Raft's DataFolder -copies that we keep as backups (renaming) after cleanup.`, - }, - { - Name: "Tracing", - Type: "bool", - - Comment: `Tracing enables propagation of contexts across binary boundaries.`, - }, - }, "Wallet": []DocField{ { Name: "RemoteBackend", diff --git a/node/config/types.go b/node/config/types.go index 013e2db1a..46f7b8a06 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -26,7 +26,6 @@ type FullNode struct { Wallet Wallet Fees FeeConfig Chainstore Chainstore - Cluster UserRaftConfig Fevm FevmConfig Index IndexConfig FaultReporter FaultReporterConfig @@ -653,33 +652,6 @@ type FeeConfig struct { DefaultMaxFee types.FIL } -type UserRaftConfig struct { - // EXPERIMENTAL. config to enabled node cluster with raft consensus - ClusterModeEnabled bool - // A folder to store Raft's data. - DataFolder string - // InitPeersetMultiAddr provides the list of initial cluster peers for new Raft - // peers (with no prior state). It is ignored when Raft was already - // initialized or when starting in staging mode. - InitPeersetMultiAddr []string - // LeaderTimeout specifies how long to wait for a leader before - // failing an operation. - WaitForLeaderTimeout Duration - // NetworkTimeout specifies how long before a Raft network - // operation is timed out - NetworkTimeout Duration - // CommitRetries specifies how many times we retry a failed commit until - // we give up. - CommitRetries int - // How long to wait between retries - CommitRetryDelay Duration - // BackupsRotate specifies the maximum number of Raft's DataFolder - // copies that we keep as backups (renaming) after cleanup. - BackupsRotate int - // Tracing enables propagation of contexts across binary boundaries. - Tracing bool -} - type FevmConfig struct { // EnableEthRPC enables eth_ rpc, and enables storing a mapping of eth transaction hashes to filecoin message Cids. // This will also enable the RealTimeFilterAPI and HistoricFilterAPI by default, but they can be disabled by config options above. diff --git a/node/impl/full.go b/node/impl/full.go index affcc960e..bc555c8c2 100644 --- a/node/impl/full.go +++ b/node/impl/full.go @@ -34,7 +34,6 @@ type FullNodeAPI struct { full.MsigAPI full.WalletAPI full.SyncAPI - full.RaftAPI full.EthAPI DS dtypes.MetadataDS @@ -119,12 +118,4 @@ func (n *FullNodeAPI) NodeStatus(ctx context.Context, inclChainStatus bool) (sta return status, nil } -func (n *FullNodeAPI) RaftState(ctx context.Context) (*api.RaftStateData, error) { - return n.RaftAPI.GetRaftState(ctx) -} - -func (n *FullNodeAPI) RaftLeader(ctx context.Context) (peer.ID, error) { - return n.RaftAPI.Leader(ctx) -} - var _ api.FullNode = &FullNodeAPI{} diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index addcc41be..fac48a350 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -44,8 +44,6 @@ type MpoolAPI struct { WalletAPI GasAPI - RaftAPI - MessageSigner messagesigner.MsgSigner PushLocks *dtypes.MpoolLocker @@ -145,20 +143,6 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe msg = &cp inMsg := *msg - // Redirect to leader if current node is not leader. A single non raft based node is always the leader - if !a.RaftAPI.IsLeader(ctx) { - var signedMsg types.SignedMessage - redirected, err := a.RaftAPI.RedirectToLeader(ctx, "MpoolPushMessage", api.MpoolMessageWhole{Msg: msg, Spec: spec}, &signedMsg) - if err != nil { - return nil, err - } - // It's possible that the current node became the leader between the check and the redirect - // In that case, continue with rest of execution and only return signedMsg if something was redirected - if redirected { - return &signedMsg, nil - } - } - // Generate spec and uuid if not available in the message if spec == nil { spec = &api.MessageSendSpec{ diff --git a/node/impl/full/raft.go b/node/impl/full/raft.go deleted file mode 100644 index 8d665ddd5..000000000 --- a/node/impl/full/raft.go +++ /dev/null @@ -1,50 +0,0 @@ -package full - -import ( - "context" - - "github.com/libp2p/go-libp2p/core/peer" - "go.uber.org/fx" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/messagesigner" -) - -type RaftAPI struct { - fx.In - - MessageSigner *messagesigner.MessageSignerConsensus `optional:"true"` -} - -func (r *RaftAPI) GetRaftState(ctx context.Context) (*api.RaftStateData, error) { - if r.MessageSigner == nil { - return nil, xerrors.Errorf("raft consensus not enabled. Please check your configuration") - } - raftState, err := r.MessageSigner.GetRaftState(ctx) - if err != nil { - return nil, err - } - return &api.RaftStateData{NonceMap: raftState.NonceMap, MsgUuids: raftState.MsgUuids}, nil -} - -func (r *RaftAPI) Leader(ctx context.Context) (peer.ID, error) { - if r.MessageSigner == nil { - return "", xerrors.Errorf("raft consensus not enabled. Please check your configuration") - } - return r.MessageSigner.Leader(ctx) -} - -func (r *RaftAPI) IsLeader(ctx context.Context) bool { - if r.MessageSigner == nil { - return true - } - return r.MessageSigner.IsLeader(ctx) -} - -func (r *RaftAPI) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) { - if r.MessageSigner == nil { - return false, xerrors.Errorf("raft consensus not enabled. Please check your configuration") - } - return r.MessageSigner.RedirectToLeader(ctx, method, arg, ret) -} diff --git a/node/modules/rpc.go b/node/modules/rpc.go deleted file mode 100644 index d76949737..000000000 --- a/node/modules/rpc.go +++ /dev/null @@ -1,55 +0,0 @@ -package modules - -import ( - "context" - - rpc "github.com/libp2p/go-libp2p-gorpc" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" - consensus "github.com/filecoin-project/lotus/lib/consensus/raft" - "github.com/filecoin-project/lotus/node/impl/full" -) - -type RPCHandler struct { - mpoolAPI full.MpoolAPI - cons *consensus.Consensus -} - -func NewRPCHandler(mpoolAPI full.MpoolAPI, cons *consensus.Consensus) *RPCHandler { - return &RPCHandler{mpoolAPI, cons} -} - -func (h *RPCHandler) MpoolPushMessage(ctx context.Context, msgWhole *api.MpoolMessageWhole, ret *types.SignedMessage) error { - signedMsg, err := h.mpoolAPI.MpoolPushMessage(ctx, msgWhole.Msg, msgWhole.Spec) - if err != nil { - return err - } - *ret = *signedMsg - return nil -} - -func (h *RPCHandler) AddPeer(ctx context.Context, pid peer.ID, ret *struct{}) error { - return h.cons.AddPeer(ctx, pid) -} - -// Add other consensus RPC calls here - -func NewRPCClient(host host.Host) *rpc.Client { - protocolID := protocol.ID("/rpc/lotus-chain/v0") - return rpc.NewClient(host, protocolID) -} - -func NewRPCServer(ctx context.Context, host host.Host, rpcHandler *RPCHandler) error { - - authF := func(pid peer.ID, svc, method string) bool { - return rpcHandler.cons.IsTrustedPeer(ctx, pid) - } - - protocolID := protocol.ID("/rpc/lotus-chain/v0") - rpcServer := rpc.NewServer(host, protocolID, rpc.WithAuthorizeFunc(authF)) - return rpcServer.RegisterName("Consensus", rpcHandler) -}