Address comments
This commit is contained in:
parent
b541cf919d
commit
800d9de4d5
@ -12,7 +12,6 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/network"
|
||||
|
||||
//"github.com/filecoin-project/lotus/chain/messagesigner"
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
|
@ -50,7 +50,7 @@ var DaemonContext = cliutil.DaemonContext
|
||||
var ReqContext = cliutil.ReqContext
|
||||
|
||||
var GetFullNodeAPI = cliutil.GetFullNodeAPI
|
||||
var GetFullNodeAPIV1 = cliutil.GetFullNodeAPIV1New
|
||||
var GetFullNodeAPIV1 = cliutil.GetFullNodeAPIV1
|
||||
var GetGatewayAPI = cliutil.GetGatewayAPI
|
||||
|
||||
var GetStorageMinerAPI = cliutil.GetStorageMinerAPI
|
||||
|
@ -290,7 +290,7 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
|
||||
}
|
||||
}
|
||||
|
||||
func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) {
|
||||
func GetFullNodeAPIV1Single(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) {
|
||||
if tn, ok := ctx.App.Metadata["testnode-full"]; ok {
|
||||
return tn.(v1api.FullNode), func() {}, nil
|
||||
}
|
||||
@ -319,7 +319,7 @@ func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, e
|
||||
return v1API, closer, nil
|
||||
}
|
||||
|
||||
func GetFullNodeAPIV1New(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) {
|
||||
func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) {
|
||||
if tn, ok := ctx.App.Metadata["testnode-full"]; ok {
|
||||
return tn.(v1api.FullNode), func() {}, nil
|
||||
}
|
||||
|
@ -32,9 +32,6 @@ import (
|
||||
|
||||
var logger = logging.Logger("raft")
|
||||
|
||||
//type NonceMapType map[addr.Address]uint64
|
||||
//type MsgUuidMapType map[uuid.UUID]*types.SignedMessage
|
||||
|
||||
type RaftState struct {
|
||||
NonceMap api.NonceMapType
|
||||
MsgUuids api.MsgUuidMapType
|
||||
@ -101,7 +98,7 @@ func (c ConsensusOp) ApplyTo(state consensus.State) (consensus.State, error) {
|
||||
var _ consensus.Op = &ConsensusOp{}
|
||||
|
||||
// Consensus handles the work of keeping a shared-state between
|
||||
// the peers of an IPFS Cluster, as well as modifying that state and
|
||||
// the peers of a Lotus Cluster, as well as modifying that state and
|
||||
// applying any updates in a thread-safe manner.
|
||||
type Consensus struct {
|
||||
ctx context.Context
|
||||
@ -121,9 +118,6 @@ type Consensus struct {
|
||||
|
||||
peerSet []peer.ID
|
||||
repo repo.LockedRepo
|
||||
|
||||
//shutdownLock sync.RWMutex
|
||||
//shutdown bool
|
||||
}
|
||||
|
||||
// NewConsensus builds a new ClusterConsensus component using Raft.
|
||||
@ -185,6 +179,7 @@ func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.Mes
|
||||
|
||||
}
|
||||
|
||||
// TODO: Merge with NewConsensus and remove the rpcReady chan
|
||||
func NewConsensusWithRPCClient(staging bool) func(host host.Host,
|
||||
cfg *ClusterRaftConfig,
|
||||
rpcClient *rpc.Client,
|
||||
@ -205,8 +200,6 @@ func NewConsensusWithRPCClient(staging bool) func(host host.Host,
|
||||
|
||||
// WaitForSync waits for a leader and for the state to be up to date, then returns.
|
||||
func (cc *Consensus) WaitForSync(ctx context.Context) error {
|
||||
//ctx, span := trace.StartSpan(ctx, "consensus/WaitForSync")
|
||||
//defer span.End()
|
||||
|
||||
leaderCtx, cancel := context.WithTimeout(ctx, cc.config.WaitForLeaderTimeout)
|
||||
defer cancel()
|
||||
@ -274,8 +267,6 @@ func (cc *Consensus) finishBootstrap() {
|
||||
// more updates. The underlying consensus is permanently
|
||||
// shutdown, along with the libp2p transport.
|
||||
func (cc *Consensus) Shutdown(ctx context.Context) error {
|
||||
//ctx, span := trace.StartSpan(ctx, "consensus/Shutdown")
|
||||
//defer span.End()
|
||||
|
||||
//cc.shutdownLock.Lock()
|
||||
//defer cc.shutdownLock.Unlock()
|
||||
@ -309,9 +300,6 @@ func (cc *Consensus) Shutdown(ctx context.Context) error {
|
||||
// Ready returns a channel which is signaled when the Consensus
|
||||
// algorithm has finished bootstrapping and is ready to use
|
||||
func (cc *Consensus) Ready(ctx context.Context) <-chan struct{} {
|
||||
//_, span := trace.StartSpan(ctx, "consensus/Ready")
|
||||
//defer span.End()
|
||||
|
||||
return cc.readyCh
|
||||
}
|
||||
|
||||
@ -330,8 +318,6 @@ func (cc *Consensus) Distrust(ctx context.Context, pid peer.ID) error { return n
|
||||
// note that if the leader just dissappeared, the rpc call will
|
||||
// fail because we haven't heard that it's gone.
|
||||
func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interface{}) (bool, error) {
|
||||
//ctx, span := trace.StartSpan(cc.ctx, "consensus/RedirectToLeader")
|
||||
//defer span.End()
|
||||
ctx := cc.ctx
|
||||
|
||||
var finalErr error
|
||||
@ -389,17 +375,6 @@ func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interf
|
||||
|
||||
// commit submits a cc.consensus commit. It retries upon failures.
|
||||
func (cc *Consensus) Commit(ctx context.Context, op *ConsensusOp) error {
|
||||
//ctx, span := trace.StartSpan(ctx, "consensus/commit")
|
||||
//defer span.End()
|
||||
//
|
||||
//if cc.config.Tracing {
|
||||
// // required to cross the serialized boundary
|
||||
// Op.SpanCtx = span.SpanContext()
|
||||
// tagmap := tag.FromContext(ctx)
|
||||
// if tagmap != nil {
|
||||
// Op.TagCtx = tag.Encode(tagmap)
|
||||
// }
|
||||
//}
|
||||
|
||||
var finalErr error
|
||||
for i := 0; i <= cc.config.CommitRetries; i++ {
|
||||
@ -411,17 +386,7 @@ func (cc *Consensus) Commit(ctx context.Context, op *ConsensusOp) error {
|
||||
i, finalErr)
|
||||
}
|
||||
|
||||
// try to send it to the leader
|
||||
// RedirectToLeader has it's own retry loop. If this fails
|
||||
// we're done here.
|
||||
|
||||
//ok, err := cc.RedirectToLeader(rpcOp, redirectArg, struct{}{})
|
||||
//if err != nil || ok {
|
||||
// return err
|
||||
//}
|
||||
|
||||
// Being here means we are the LEADER. We can commit.
|
||||
|
||||
// now commit the changes to our state
|
||||
//cc.shutdownLock.RLock() // do not shut down while committing
|
||||
_, finalErr = cc.consensus.CommitOp(op)
|
||||
@ -439,9 +404,6 @@ func (cc *Consensus) Commit(ctx context.Context, op *ConsensusOp) error {
|
||||
// AddPeer adds a new peer to participate in this consensus. It will
|
||||
// forward the operation to the leader if this is not it.
|
||||
func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error {
|
||||
//ctx, span := trace.StartSpan(ctx, "consensus/AddPeer")
|
||||
//defer span.End()
|
||||
|
||||
var finalErr error
|
||||
for i := 0; i <= cc.config.CommitRetries; i++ {
|
||||
logger.Debugf("attempt #%d: AddPeer %s", i, pid.Pretty())
|
||||
@ -470,9 +432,6 @@ func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error {
|
||||
// RmPeer removes a peer from this consensus. It will
|
||||
// forward the operation to the leader if this is not it.
|
||||
func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error {
|
||||
//ctx, span := trace.StartSpan(ctx, "consensus/RmPeer")
|
||||
//defer span.End()
|
||||
|
||||
var finalErr error
|
||||
for i := 0; i <= cc.config.CommitRetries; i++ {
|
||||
logger.Debugf("attempt #%d: RmPeer %s", i, pid.Pretty())
|
||||
@ -503,9 +462,6 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error {
|
||||
// writes to the shared state should happen through the Consensus component
|
||||
// methods.
|
||||
func (cc *Consensus) State(ctx context.Context) (*RaftState, error) {
|
||||
//_, span := trace.StartSpan(ctx, "consensus/RaftState")
|
||||
//defer span.End()
|
||||
|
||||
st, err := cc.consensus.GetLogHead()
|
||||
if err == libp2praft.ErrNoState {
|
||||
return newRaftState(nil), nil
|
||||
@ -524,9 +480,6 @@ func (cc *Consensus) State(ctx context.Context) (*RaftState, error) {
|
||||
// Leader returns the peerID of the Leader of the
|
||||
// cluster. It returns an error when there is no leader.
|
||||
func (cc *Consensus) Leader(ctx context.Context) (peer.ID, error) {
|
||||
//_, span := trace.StartSpan(ctx, "consensus/Leader")
|
||||
//defer span.End()
|
||||
|
||||
// Note the hard-dependency on raft here...
|
||||
raftactor := cc.actor.(*libp2praft.Actor)
|
||||
return raftactor.Leader()
|
||||
@ -534,9 +487,6 @@ func (cc *Consensus) Leader(ctx context.Context) (peer.ID, error) {
|
||||
|
||||
// Clean removes the Raft persisted state.
|
||||
func (cc *Consensus) Clean(ctx context.Context) error {
|
||||
//_, span := trace.StartSpan(ctx, "consensus/Clean")
|
||||
//defer span.End()
|
||||
|
||||
//cc.shutdownLock.RLock()
|
||||
//defer cc.shutdownLock.RUnlock()
|
||||
//if !cc.shutdown {
|
||||
@ -560,8 +510,6 @@ func (cc *Consensus) Clean(ctx context.Context) error {
|
||||
// Peers return the current list of peers in the consensus.
|
||||
// The list will be sorted alphabetically.
|
||||
func (cc *Consensus) Peers(ctx context.Context) ([]peer.ID, error) {
|
||||
//ctx, span := trace.StartSpan(ctx, "consensus/Peers")
|
||||
//defer span.End()
|
||||
|
||||
//cc.shutdownLock.RLock() // prevent shutdown while here
|
||||
//defer cc.shutdownLock.RUnlock()
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"go.uber.org/multierr"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
@ -264,9 +265,6 @@ func makeServerConf(peers []peer.ID) hraft.Configuration {
|
||||
// WaitForLeader holds until Raft says we have a leader.
|
||||
// Returns if ctx is canceled.
|
||||
func (rw *raftWrapper) WaitForLeader(ctx context.Context) (string, error) {
|
||||
//ctx, span := trace.StartSpan(ctx, "consensus/raft/WaitForLeader")
|
||||
//defer span.End()
|
||||
|
||||
ticker := time.NewTicker(time.Second / 2)
|
||||
for {
|
||||
select {
|
||||
@ -284,9 +282,6 @@ func (rw *raftWrapper) WaitForLeader(ctx context.Context) (string, error) {
|
||||
}
|
||||
|
||||
func (rw *raftWrapper) WaitForVoter(ctx context.Context) error {
|
||||
//ctx, span := trace.StartSpan(ctx, "consensus/raft/WaitForVoter")
|
||||
//defer span.End()
|
||||
|
||||
logger.Debug("waiting until we are promoted to a voter")
|
||||
|
||||
pid := hraft.ServerID(peer.Encode(rw.host.ID()))
|
||||
@ -322,8 +317,6 @@ func isVoter(srvID hraft.ServerID, cfg hraft.Configuration) bool {
|
||||
|
||||
// WaitForUpdates holds until Raft has synced to the last index in the log
|
||||
func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error {
|
||||
//ctx, span := trace.StartSpan(ctx, "consensus/raft/WaitForUpdates")
|
||||
//defer span.End()
|
||||
|
||||
logger.Debug("Raft state is catching up to the latest known version. Please wait...")
|
||||
for {
|
||||
@ -344,8 +337,6 @@ func (rw *raftWrapper) WaitForUpdates(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (rw *raftWrapper) WaitForPeer(ctx context.Context, pid string, depart bool) error {
|
||||
//ctx, span := trace.StartSpan(ctx, "consensus/raft/WaitForPeer")
|
||||
//defer span.End()
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -422,40 +413,32 @@ func (rw *raftWrapper) snapshotOnShutdown() error {
|
||||
|
||||
// Shutdown shutdown Raft and closes the BoltDB.
|
||||
func (rw *raftWrapper) Shutdown(ctx context.Context) error {
|
||||
//_, span := trace.StartSpan(ctx, "consensus/raft/Shutdown")
|
||||
//defer span.End()
|
||||
|
||||
errMsgs := ""
|
||||
|
||||
rw.cancel()
|
||||
|
||||
var finalErr error
|
||||
|
||||
err := rw.snapshotOnShutdown()
|
||||
if err != nil {
|
||||
errMsgs += err.Error() + ".\n"
|
||||
finalErr = multierr.Append(finalErr, err)
|
||||
}
|
||||
|
||||
future := rw.raft.Shutdown()
|
||||
err = future.Error()
|
||||
if err != nil {
|
||||
errMsgs += "could not shutdown raft: " + err.Error() + ".\n"
|
||||
finalErr = multierr.Append(finalErr, err)
|
||||
}
|
||||
|
||||
err = rw.boltdb.Close() // important!
|
||||
if err != nil {
|
||||
errMsgs += "could not close boltdb: " + err.Error()
|
||||
finalErr = multierr.Append(finalErr, err)
|
||||
}
|
||||
|
||||
if errMsgs != "" {
|
||||
return errors.New(errMsgs)
|
||||
}
|
||||
|
||||
return nil
|
||||
return finalErr
|
||||
}
|
||||
|
||||
// AddPeer adds a peer to Raft
|
||||
func (rw *raftWrapper) AddPeer(ctx context.Context, peerId peer.ID) error {
|
||||
//ctx, span := trace.StartSpan(ctx, "consensus/raft/AddPeer")
|
||||
//defer span.End()
|
||||
|
||||
// Check that we don't have it to not waste
|
||||
// log entries if so.
|
||||
@ -488,9 +471,6 @@ func (rw *raftWrapper) AddPeer(ctx context.Context, peerId peer.ID) error {
|
||||
|
||||
// RemovePeer removes a peer from Raft
|
||||
func (rw *raftWrapper) RemovePeer(ctx context.Context, peer string) error {
|
||||
//ctx, span := trace.StartSpan(ctx, "consensus/RemovePeer")
|
||||
//defer span.End()
|
||||
|
||||
// Check that we have it to not waste
|
||||
// log entries if we don't.
|
||||
peers, err := rw.Peers(ctx)
|
||||
@ -510,7 +490,7 @@ func (rw *raftWrapper) RemovePeer(ctx context.Context, peer string) error {
|
||||
hraft.ServerID(peer),
|
||||
0,
|
||||
0,
|
||||
) // TODO: Extra cfg value?
|
||||
)
|
||||
err = rmFuture.Error()
|
||||
if err != nil {
|
||||
logger.Error("raft cannot remove peer: ", err)
|
||||
@ -523,16 +503,10 @@ func (rw *raftWrapper) RemovePeer(ctx context.Context, peer string) error {
|
||||
// Leader returns Raft's leader. It may be an empty string if
|
||||
// there is no leader or it is unknown.
|
||||
func (rw *raftWrapper) Leader(ctx context.Context) string {
|
||||
//_, span := trace.StartSpan(ctx, "consensus/raft/Leader")
|
||||
//defer span.End()
|
||||
|
||||
return string(rw.raft.Leader())
|
||||
}
|
||||
|
||||
func (rw *raftWrapper) Peers(ctx context.Context) ([]string, error) {
|
||||
//_, span := trace.StartSpan(ctx, "consensus/raft/Peers")
|
||||
//defer span.End()
|
||||
|
||||
ids := make([]string, 0)
|
||||
|
||||
configFuture := rw.raft.GetConfiguration()
|
||||
|
@ -15,7 +15,6 @@ func Retry[T any](ctx context.Context, attempts int, initialBackoff time.Duratio
|
||||
for i := 0; i < attempts; i++ {
|
||||
if i > 0 {
|
||||
log.Info("Retrying after error:", err)
|
||||
//debug.PrintStack()
|
||||
time.Sleep(initialBackoff)
|
||||
initialBackoff *= 2
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
ledgerwallet "github.com/filecoin-project/lotus/chain/wallet/ledger"
|
||||
"github.com/filecoin-project/lotus/chain/wallet/remotewallet"
|
||||
consensus2 "github.com/filecoin-project/lotus/lib/consensus/raft"
|
||||
raftcns "github.com/filecoin-project/lotus/lib/consensus/raft"
|
||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||
"github.com/filecoin-project/lotus/markets/retrievaladapter"
|
||||
"github.com/filecoin-project/lotus/markets/storageadapter"
|
||||
@ -244,8 +244,8 @@ func ConfigFullNode(c interface{}) Option {
|
||||
// Chain node cluster enabled
|
||||
If(cfg.Cluster.ClusterModeEnabled,
|
||||
Override(new(*gorpc.Client), modules.NewRPCClient),
|
||||
Override(new(*consensus2.ClusterRaftConfig), consensus2.NewClusterRaftConfig(&cfg.Cluster)),
|
||||
Override(new(*consensus2.Consensus), consensus2.NewConsensusWithRPCClient(false)),
|
||||
Override(new(*raftcns.ClusterRaftConfig), raftcns.NewClusterRaftConfig(&cfg.Cluster)),
|
||||
Override(new(*raftcns.Consensus), raftcns.NewConsensusWithRPCClient(false)),
|
||||
Override(new(*messagesigner.MessageSignerConsensus), messagesigner.NewMessageSignerConsensus),
|
||||
Override(new(messagesigner.MsgSigner), From(new(*messagesigner.MessageSignerConsensus))),
|
||||
Override(new(*modules.RPCHandler), modules.NewRPCHandler),
|
||||
|
@ -603,7 +603,7 @@ type FeeConfig struct {
|
||||
}
|
||||
|
||||
type UserRaftConfig struct {
|
||||
// config to enabled node cluster with raft consensus
|
||||
// EXPERIMENTAL. config to enabled node cluster with raft consensus
|
||||
ClusterModeEnabled bool
|
||||
// will shutdown libp2p host on shutdown. Useful for testing
|
||||
HostShutdown bool
|
||||
@ -629,10 +629,6 @@ type UserRaftConfig struct {
|
||||
BackupsRotate int
|
||||
// Namespace to use when writing keys to the datastore
|
||||
DatastoreNamespace string
|
||||
|
||||
// A Hashicorp Raft's configuration object.
|
||||
//RaftConfig *hraft.Config
|
||||
|
||||
// Tracing enables propagation of contexts across binary boundaries.
|
||||
Tracing bool
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ type RaftAPI struct {
|
||||
|
||||
func (r *RaftAPI) GetRaftState(ctx context.Context) (*api.RaftStateData, error) {
|
||||
if r.MessageSigner == nil {
|
||||
return nil, xerrors.Errorf("Raft consensus not enabled. Please check your configuration")
|
||||
return nil, xerrors.Errorf("raft consensus not enabled. Please check your configuration")
|
||||
}
|
||||
raftState, err := r.MessageSigner.GetRaftState(ctx)
|
||||
if err != nil {
|
||||
@ -30,7 +30,7 @@ func (r *RaftAPI) GetRaftState(ctx context.Context) (*api.RaftStateData, error)
|
||||
|
||||
func (r *RaftAPI) Leader(ctx context.Context) (peer.ID, error) {
|
||||
if r.MessageSigner == nil {
|
||||
return "", xerrors.Errorf("Raft consensus not enabled. Please check your configuration")
|
||||
return "", xerrors.Errorf("raft consensus not enabled. Please check your configuration")
|
||||
}
|
||||
return r.MessageSigner.Leader(ctx)
|
||||
}
|
||||
@ -44,7 +44,7 @@ func (r *RaftAPI) IsLeader(ctx context.Context) bool {
|
||||
|
||||
func (r *RaftAPI) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) {
|
||||
if r.MessageSigner == nil {
|
||||
return false, xerrors.Errorf("Raft consensus not enabled. Please check your configuration")
|
||||
return false, xerrors.Errorf("raft consensus not enabled. Please check your configuration")
|
||||
}
|
||||
return r.MessageSigner.RedirectToLeader(ctx, method, arg, ret)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user