From 800d9de4d53f74a5725339307de70a7e76edce35 Mon Sep 17 00:00:00 2001 From: Shrenuj Bansal Date: Mon, 14 Nov 2022 15:46:58 -0500 Subject: [PATCH] Address comments --- chain/messagepool/provider.go | 1 - cli/cmd.go | 2 +- cli/util/api.go | 4 +-- lib/consensus/raft/consensus.go | 56 ++------------------------------- lib/consensus/raft/raft.go | 42 +++++-------------------- lib/retry/retry.go | 1 - node/builder_chain.go | 6 ++-- node/config/types.go | 6 +--- node/impl/full/raft.go | 6 ++-- 9 files changed, 20 insertions(+), 104 deletions(-) diff --git a/chain/messagepool/provider.go b/chain/messagepool/provider.go index 706ecc908..f8bbbc01e 100644 --- a/chain/messagepool/provider.go +++ b/chain/messagepool/provider.go @@ -12,7 +12,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/network" - //"github.com/filecoin-project/lotus/chain/messagesigner" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" diff --git a/cli/cmd.go b/cli/cmd.go index 49d82e9d6..79023917b 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -50,7 +50,7 @@ var DaemonContext = cliutil.DaemonContext var ReqContext = cliutil.ReqContext var GetFullNodeAPI = cliutil.GetFullNodeAPI -var GetFullNodeAPIV1 = cliutil.GetFullNodeAPIV1New +var GetFullNodeAPIV1 = cliutil.GetFullNodeAPIV1 var GetGatewayAPI = cliutil.GetGatewayAPI var GetStorageMinerAPI = cliutil.GetStorageMinerAPI diff --git a/cli/util/api.go b/cli/util/api.go index d3d0b4c6f..596322ab8 100644 --- a/cli/util/api.go +++ b/cli/util/api.go @@ -290,7 +290,7 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) { } } -func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) { +func GetFullNodeAPIV1Single(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) { if tn, ok := ctx.App.Metadata["testnode-full"]; ok { return tn.(v1api.FullNode), func() {}, nil } @@ -319,7 +319,7 @@ func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, e return v1API, closer, nil } -func GetFullNodeAPIV1New(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) { +func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) { if tn, ok := ctx.App.Metadata["testnode-full"]; ok { return tn.(v1api.FullNode), func() {}, nil } diff --git a/lib/consensus/raft/consensus.go b/lib/consensus/raft/consensus.go index 93536f584..64efa1df1 100644 --- a/lib/consensus/raft/consensus.go +++ b/lib/consensus/raft/consensus.go @@ -32,9 +32,6 @@ import ( var logger = logging.Logger("raft") -//type NonceMapType map[addr.Address]uint64 -//type MsgUuidMapType map[uuid.UUID]*types.SignedMessage - type RaftState struct { NonceMap api.NonceMapType MsgUuids api.MsgUuidMapType @@ -101,7 +98,7 @@ func (c ConsensusOp) ApplyTo(state consensus.State) (consensus.State, error) { var _ consensus.Op = &ConsensusOp{} // Consensus handles the work of keeping a shared-state between -// the peers of an IPFS Cluster, as well as modifying that state and +// the peers of a Lotus Cluster, as well as modifying that state and // applying any updates in a thread-safe manner. type Consensus struct { ctx context.Context @@ -121,9 +118,6 @@ type Consensus struct { peerSet []peer.ID repo repo.LockedRepo - - //shutdownLock sync.RWMutex - //shutdown bool } // NewConsensus builds a new ClusterConsensus component using Raft. @@ -185,6 +179,7 @@ func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.Mes } +// TODO: Merge with NewConsensus and remove the rpcReady chan func NewConsensusWithRPCClient(staging bool) func(host host.Host, cfg *ClusterRaftConfig, rpcClient *rpc.Client, @@ -205,8 +200,6 @@ func NewConsensusWithRPCClient(staging bool) func(host host.Host, // WaitForSync waits for a leader and for the state to be up to date, then returns. func (cc *Consensus) WaitForSync(ctx context.Context) error { - //ctx, span := trace.StartSpan(ctx, "consensus/WaitForSync") - //defer span.End() leaderCtx, cancel := context.WithTimeout(ctx, cc.config.WaitForLeaderTimeout) defer cancel() @@ -274,8 +267,6 @@ func (cc *Consensus) finishBootstrap() { // more updates. The underlying consensus is permanently // shutdown, along with the libp2p transport. func (cc *Consensus) Shutdown(ctx context.Context) error { - //ctx, span := trace.StartSpan(ctx, "consensus/Shutdown") - //defer span.End() //cc.shutdownLock.Lock() //defer cc.shutdownLock.Unlock() @@ -309,9 +300,6 @@ func (cc *Consensus) Shutdown(ctx context.Context) error { // Ready returns a channel which is signaled when the Consensus // algorithm has finished bootstrapping and is ready to use func (cc *Consensus) Ready(ctx context.Context) <-chan struct{} { - //_, span := trace.StartSpan(ctx, "consensus/Ready") - //defer span.End() - return cc.readyCh } @@ -330,8 +318,6 @@ func (cc *Consensus) Distrust(ctx context.Context, pid peer.ID) error { return n // note that if the leader just dissappeared, the rpc call will // fail because we haven't heard that it's gone. func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interface{}) (bool, error) { - //ctx, span := trace.StartSpan(cc.ctx, "consensus/RedirectToLeader") - //defer span.End() ctx := cc.ctx var finalErr error @@ -389,17 +375,6 @@ func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interf // commit submits a cc.consensus commit. It retries upon failures. func (cc *Consensus) Commit(ctx context.Context, op *ConsensusOp) error { - //ctx, span := trace.StartSpan(ctx, "consensus/commit") - //defer span.End() - // - //if cc.config.Tracing { - // // required to cross the serialized boundary - // Op.SpanCtx = span.SpanContext() - // tagmap := tag.FromContext(ctx) - // if tagmap != nil { - // Op.TagCtx = tag.Encode(tagmap) - // } - //} var finalErr error for i := 0; i <= cc.config.CommitRetries; i++ { @@ -411,17 +386,7 @@ func (cc *Consensus) Commit(ctx context.Context, op *ConsensusOp) error { i, finalErr) } - // try to send it to the leader - // RedirectToLeader has it's own retry loop. If this fails - // we're done here. - - //ok, err := cc.RedirectToLeader(rpcOp, redirectArg, struct{}{}) - //if err != nil || ok { - // return err - //} - // Being here means we are the LEADER. We can commit. - // now commit the changes to our state //cc.shutdownLock.RLock() // do not shut down while committing _, finalErr = cc.consensus.CommitOp(op) @@ -439,9 +404,6 @@ func (cc *Consensus) Commit(ctx context.Context, op *ConsensusOp) error { // AddPeer adds a new peer to participate in this consensus. It will // forward the operation to the leader if this is not it. func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error { - //ctx, span := trace.StartSpan(ctx, "consensus/AddPeer") - //defer span.End() - var finalErr error for i := 0; i <= cc.config.CommitRetries; i++ { logger.Debugf("attempt #%d: AddPeer %s", i, pid.Pretty()) @@ -470,9 +432,6 @@ func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error { // RmPeer removes a peer from this consensus. It will // forward the operation to the leader if this is not it. func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error { - //ctx, span := trace.StartSpan(ctx, "consensus/RmPeer") - //defer span.End() - var finalErr error for i := 0; i <= cc.config.CommitRetries; i++ { logger.Debugf("attempt #%d: RmPeer %s", i, pid.Pretty()) @@ -503,9 +462,6 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error { // writes to the shared state should happen through the Consensus component // methods. func (cc *Consensus) State(ctx context.Context) (*RaftState, error) { - //_, span := trace.StartSpan(ctx, "consensus/RaftState") - //defer span.End() - st, err := cc.consensus.GetLogHead() if err == libp2praft.ErrNoState { return newRaftState(nil), nil @@ -524,9 +480,6 @@ func (cc *Consensus) State(ctx context.Context) (*RaftState, error) { // Leader returns the peerID of the Leader of the // cluster. It returns an error when there is no leader. func (cc *Consensus) Leader(ctx context.Context) (peer.ID, error) { - //_, span := trace.StartSpan(ctx, "consensus/Leader") - //defer span.End() - // Note the hard-dependency on raft here... raftactor := cc.actor.(*libp2praft.Actor) return raftactor.Leader() @@ -534,9 +487,6 @@ func (cc *Consensus) Leader(ctx context.Context) (peer.ID, error) { // Clean removes the Raft persisted state. func (cc *Consensus) Clean(ctx context.Context) error { - //_, span := trace.StartSpan(ctx, "consensus/Clean") - //defer span.End() - //cc.shutdownLock.RLock() //defer cc.shutdownLock.RUnlock() //if !cc.shutdown { @@ -560,8 +510,6 @@ func (cc *Consensus) Clean(ctx context.Context) error { // Peers return the current list of peers in the consensus. // The list will be sorted alphabetically. func (cc *Consensus) Peers(ctx context.Context) ([]peer.ID, error) { - //ctx, span := trace.StartSpan(ctx, "consensus/Peers") - //defer span.End() //cc.shutdownLock.RLock() // prevent shutdown while here //defer cc.shutdownLock.RUnlock() diff --git a/lib/consensus/raft/raft.go b/lib/consensus/raft/raft.go index d991e6956..b55e07957 100644 --- a/lib/consensus/raft/raft.go +++ b/lib/consensus/raft/raft.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "go.uber.org/multierr" "os" "path/filepath" "time" @@ -264,9 +265,6 @@ func makeServerConf(peers []peer.ID) hraft.Configuration { // WaitForLeader holds until Raft says we have a leader. // Returns if ctx is canceled. func (rw *raftWrapper) WaitForLeader(ctx context.Context) (string, error) { - //ctx, span := trace.StartSpan(ctx, "consensus/raft/WaitForLeader") - //defer span.End() - ticker := time.NewTicker(time.Second / 2) for { select { @@ -284,9 +282,6 @@ func (rw *raftWrapper) WaitForLeader(ctx context.Context) (string, error) { } func (rw *raftWrapper) WaitForVoter(ctx context.Context) error { - //ctx, span := trace.StartSpan(ctx, "consensus/raft/WaitForVoter") - //defer span.End() - logger.Debug("waiting until we are promoted to a voter") pid := hraft.ServerID(peer.Encode(rw.host.ID())) @@ -322,8 +317,6 @@ func isVoter(srvID hraft.ServerID, cfg hraft.Configuration) bool { // WaitForUpdates holds until Raft has synced to the last index in the log func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error { - //ctx, span := trace.StartSpan(ctx, "consensus/raft/WaitForUpdates") - //defer span.End() logger.Debug("Raft state is catching up to the latest known version. Please wait...") for { @@ -344,8 +337,6 @@ func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error { } func (rw *raftWrapper) WaitForPeer(ctx context.Context, pid string, depart bool) error { - //ctx, span := trace.StartSpan(ctx, "consensus/raft/WaitForPeer") - //defer span.End() for { select { @@ -422,40 +413,32 @@ func (rw *raftWrapper) snapshotOnShutdown() error { // Shutdown shutdown Raft and closes the BoltDB. func (rw *raftWrapper) Shutdown(ctx context.Context) error { - //_, span := trace.StartSpan(ctx, "consensus/raft/Shutdown") - //defer span.End() - - errMsgs := "" rw.cancel() + var finalErr error + err := rw.snapshotOnShutdown() if err != nil { - errMsgs += err.Error() + ".\n" + finalErr = multierr.Append(finalErr, err) } future := rw.raft.Shutdown() err = future.Error() if err != nil { - errMsgs += "could not shutdown raft: " + err.Error() + ".\n" + finalErr = multierr.Append(finalErr, err) } err = rw.boltdb.Close() // important! if err != nil { - errMsgs += "could not close boltdb: " + err.Error() + finalErr = multierr.Append(finalErr, err) } - if errMsgs != "" { - return errors.New(errMsgs) - } - - return nil + return finalErr } // AddPeer adds a peer to Raft func (rw *raftWrapper) AddPeer(ctx context.Context, peerId peer.ID) error { - //ctx, span := trace.StartSpan(ctx, "consensus/raft/AddPeer") - //defer span.End() // Check that we don't have it to not waste // log entries if so. @@ -488,9 +471,6 @@ func (rw *raftWrapper) AddPeer(ctx context.Context, peerId peer.ID) error { // RemovePeer removes a peer from Raft func (rw *raftWrapper) RemovePeer(ctx context.Context, peer string) error { - //ctx, span := trace.StartSpan(ctx, "consensus/RemovePeer") - //defer span.End() - // Check that we have it to not waste // log entries if we don't. peers, err := rw.Peers(ctx) @@ -510,7 +490,7 @@ func (rw *raftWrapper) RemovePeer(ctx context.Context, peer string) error { hraft.ServerID(peer), 0, 0, - ) // TODO: Extra cfg value? + ) err = rmFuture.Error() if err != nil { logger.Error("raft cannot remove peer: ", err) @@ -523,16 +503,10 @@ func (rw *raftWrapper) RemovePeer(ctx context.Context, peer string) error { // Leader returns Raft's leader. It may be an empty string if // there is no leader or it is unknown. func (rw *raftWrapper) Leader(ctx context.Context) string { - //_, span := trace.StartSpan(ctx, "consensus/raft/Leader") - //defer span.End() - return string(rw.raft.Leader()) } func (rw *raftWrapper) Peers(ctx context.Context) ([]string, error) { - //_, span := trace.StartSpan(ctx, "consensus/raft/Peers") - //defer span.End() - ids := make([]string, 0) configFuture := rw.raft.GetConfiguration() diff --git a/lib/retry/retry.go b/lib/retry/retry.go index 1268b9655..897dbb06c 100644 --- a/lib/retry/retry.go +++ b/lib/retry/retry.go @@ -15,7 +15,6 @@ func Retry[T any](ctx context.Context, attempts int, initialBackoff time.Duratio for i := 0; i < attempts; i++ { if i > 0 { log.Info("Retrying after error:", err) - //debug.PrintStack() time.Sleep(initialBackoff) initialBackoff *= 2 } diff --git a/node/builder_chain.go b/node/builder_chain.go index ee63067f5..2201be2e6 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -29,7 +29,7 @@ import ( "github.com/filecoin-project/lotus/chain/wallet" ledgerwallet "github.com/filecoin-project/lotus/chain/wallet/ledger" "github.com/filecoin-project/lotus/chain/wallet/remotewallet" - consensus2 "github.com/filecoin-project/lotus/lib/consensus/raft" + raftcns "github.com/filecoin-project/lotus/lib/consensus/raft" "github.com/filecoin-project/lotus/lib/peermgr" "github.com/filecoin-project/lotus/markets/retrievaladapter" "github.com/filecoin-project/lotus/markets/storageadapter" @@ -244,8 +244,8 @@ func ConfigFullNode(c interface{}) Option { // Chain node cluster enabled If(cfg.Cluster.ClusterModeEnabled, Override(new(*gorpc.Client), modules.NewRPCClient), - Override(new(*consensus2.ClusterRaftConfig), consensus2.NewClusterRaftConfig(&cfg.Cluster)), - Override(new(*consensus2.Consensus), consensus2.NewConsensusWithRPCClient(false)), + Override(new(*raftcns.ClusterRaftConfig), raftcns.NewClusterRaftConfig(&cfg.Cluster)), + Override(new(*raftcns.Consensus), raftcns.NewConsensusWithRPCClient(false)), Override(new(*messagesigner.MessageSignerConsensus), messagesigner.NewMessageSignerConsensus), Override(new(messagesigner.MsgSigner), From(new(*messagesigner.MessageSignerConsensus))), Override(new(*modules.RPCHandler), modules.NewRPCHandler), diff --git a/node/config/types.go b/node/config/types.go index d44597a31..1aa276782 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -603,7 +603,7 @@ type FeeConfig struct { } type UserRaftConfig struct { - // config to enabled node cluster with raft consensus + // EXPERIMENTAL. config to enabled node cluster with raft consensus ClusterModeEnabled bool // will shutdown libp2p host on shutdown. Useful for testing HostShutdown bool @@ -629,10 +629,6 @@ type UserRaftConfig struct { BackupsRotate int // Namespace to use when writing keys to the datastore DatastoreNamespace string - - // A Hashicorp Raft's configuration object. - //RaftConfig *hraft.Config - // Tracing enables propagation of contexts across binary boundaries. Tracing bool } diff --git a/node/impl/full/raft.go b/node/impl/full/raft.go index b5019c9ab..8d665ddd5 100644 --- a/node/impl/full/raft.go +++ b/node/impl/full/raft.go @@ -19,7 +19,7 @@ type RaftAPI struct { func (r *RaftAPI) GetRaftState(ctx context.Context) (*api.RaftStateData, error) { if r.MessageSigner == nil { - return nil, xerrors.Errorf("Raft consensus not enabled. Please check your configuration") + return nil, xerrors.Errorf("raft consensus not enabled. Please check your configuration") } raftState, err := r.MessageSigner.GetRaftState(ctx) if err != nil { @@ -30,7 +30,7 @@ func (r *RaftAPI) GetRaftState(ctx context.Context) (*api.RaftStateData, error) func (r *RaftAPI) Leader(ctx context.Context) (peer.ID, error) { if r.MessageSigner == nil { - return "", xerrors.Errorf("Raft consensus not enabled. Please check your configuration") + return "", xerrors.Errorf("raft consensus not enabled. Please check your configuration") } return r.MessageSigner.Leader(ctx) } @@ -44,7 +44,7 @@ func (r *RaftAPI) IsLeader(ctx context.Context) bool { func (r *RaftAPI) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) { if r.MessageSigner == nil { - return false, xerrors.Errorf("Raft consensus not enabled. Please check your configuration") + return false, xerrors.Errorf("raft consensus not enabled. Please check your configuration") } return r.MessageSigner.RedirectToLeader(ctx, method, arg, ret) }