diff --git a/go.mod b/go.mod index e477b0d38..db6674b56 100644 --- a/go.mod +++ b/go.mod @@ -69,6 +69,7 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/raft v1.1.1 + github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea github.com/icza/backscanner v0.0.0-20210726202459-ac2ffc679f94 github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab github.com/ipfs/bbloom v0.0.4 @@ -148,9 +149,11 @@ require ( go.opentelemetry.io/otel/bridge/opencensus v0.25.0 go.opentelemetry.io/otel/exporters/jaeger v1.2.0 go.opentelemetry.io/otel/sdk v1.2.0 + go.uber.org/atomic v1.10.0 go.uber.org/fx v1.15.0 go.uber.org/multierr v1.8.0 go.uber.org/zap v1.22.0 + golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5 golang.org/x/net v0.0.0-20220812174116-3211cb980234 golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab @@ -173,6 +176,7 @@ require ( github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bep/debounce v1.2.0 // indirect + github.com/boltdb/bolt v1.3.1 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cheekybits/genny v1.0.0 // indirect @@ -325,11 +329,9 @@ require ( go.opentelemetry.io/otel/metric v0.25.0 // indirect go.opentelemetry.io/otel/sdk/export/metric v0.25.0 // indirect go.opentelemetry.io/otel/trace v1.7.0 // indirect - go.uber.org/atomic v1.10.0 // indirect go.uber.org/dig v1.12.0 // indirect go4.org v0.0.0-20200411211856-f5505b9728dd // indirect golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect - golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect diff --git a/go.sum b/go.sum index 90392fc2e..d94124286 100644 --- a/go.sum +++ b/go.sum @@ -125,6 +125,7 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo= github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/briandowns/spinner v1.11.1/go.mod h1:QOuQk7x+EaDASo80FEXwlwiA+j/PPIcX3FScO+3/ZPQ= @@ -636,6 +637,7 @@ github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0m github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs= github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= +github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/hodgesds/perf-utils v0.0.8/go.mod h1:F6TfvsbtrF88i++hou29dTXlI2sfsJv+gRZDtmTJkAs= diff --git a/lib/consensus/raft/config.go b/lib/consensus/raft/config.go index 3153d2a33..541f43e16 100644 --- a/lib/consensus/raft/config.go +++ b/lib/consensus/raft/config.go @@ -1,7 +1,9 @@ package consensus import ( + "github.com/filecoin-project/lotus/node/repo" "io/ioutil" + "path/filepath" "time" hraft "github.com/hashicorp/raft" @@ -18,7 +20,7 @@ var envConfigKey = "cluster_raft" // Configuration defaults var ( - DefaultDataSubFolder = "raft" + DefaultDataSubFolder = "raft-cluster" DefaultWaitForLeaderTimeout = 15 * time.Second DefaultCommitRetries = 1 DefaultNetworkTimeout = 100 * time.Second @@ -350,13 +352,14 @@ func ValidateConfig(cfg *ClusterRaftConfig) error { // return cfg.applyJSONConfig(jcfg) //} // -//// GetDataFolder returns the Raft data folder that we are using. -//func (cfg *Config) GetDataFolder() string { -// if cfg.DataFolder == "" { -// return filepath.Join(cfg.BaseDir, DefaultDataSubFolder) -// } -// return cfg.DataFolder -//} +// GetDataFolder returns the Raft data folder that we are using. +func (cfg *ClusterRaftConfig) GetDataFolder(repo repo.LockedRepo) string { + if cfg.DataFolder == "" { + return filepath.Join(repo.Path() + DefaultDataSubFolder) + } + return filepath.Join(repo.Path() + cfg.DataFolder) +} + // //// ToDisplayJSON returns JSON config as a string. //func (cfg *Config) ToDisplayJSON() ([]byte, error) { diff --git a/lib/consensus/raft/consensus.go b/lib/consensus/raft/consensus.go index 464214308..07e9136c0 100644 --- a/lib/consensus/raft/consensus.go +++ b/lib/consensus/raft/consensus.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "github.com/filecoin-project/lotus/node/repo" "sort" "time" @@ -83,6 +84,7 @@ type Consensus struct { readyCh chan struct{} peerSet []peer.ID + repo repo.LockedRepo //shutdownLock sync.RWMutex //shutdown bool @@ -96,7 +98,7 @@ type Consensus struct { // // The staging parameter controls if the Raft peer should start in // staging mode (used when joining a new Raft peerset with other peers). -func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.MessagePool, staging bool) (*Consensus, error) { +func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.MessagePool, repo repo.LockedRepo, staging bool) (*Consensus, error) { err := ValidateConfig(cfg) if err != nil { return nil, err @@ -109,7 +111,7 @@ func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.Mes consensus := libp2praft.NewOpLog(state, &ConsensusOp{}) - raft, err := newRaftWrapper(host, cfg, consensus.FSM(), staging) + raft, err := newRaftWrapper(host, cfg, consensus.FSM(), repo, staging) if err != nil { logger.Error("error creating raft: ", err) cancel() @@ -130,6 +132,7 @@ func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.Mes peerSet: cfg.InitPeerset, rpcReady: make(chan struct{}, 1), readyCh: make(chan struct{}, 1), + repo: repo, } go cc.finishBootstrap() @@ -141,10 +144,11 @@ func NewConsensusWithRPCClient(staging bool) func(host host.Host, cfg *ClusterRaftConfig, rpcClient *rpc.Client, mpool *messagepool.MessagePool, + repo repo.LockedRepo, ) (*Consensus, error) { - return func(host host.Host, cfg *ClusterRaftConfig, rpcClient *rpc.Client, mpool *messagepool.MessagePool) (*Consensus, error) { - cc, err := NewConsensus(host, cfg, mpool, staging) + return func(host host.Host, cfg *ClusterRaftConfig, rpcClient *rpc.Client, mpool *messagepool.MessagePool, repo repo.LockedRepo) (*Consensus, error) { + cc, err := NewConsensus(host, cfg, mpool, repo, staging) if err != nil { return nil, err } diff --git a/lib/consensus/raft/raft.go b/lib/consensus/raft/raft.go index 245ae168d..1fb6195de 100644 --- a/lib/consensus/raft/raft.go +++ b/lib/consensus/raft/raft.go @@ -4,16 +4,23 @@ import ( "context" "errors" "fmt" + "github.com/filecoin-project/lotus/node/repo" + "github.com/ipfs/go-log/v2" + "go.uber.org/zap" "io" "os" + "path/filepath" "time" hraft "github.com/hashicorp/raft" + raftboltdb "github.com/hashicorp/raft-boltdb" p2praft "github.com/libp2p/go-libp2p-raft" host "github.com/libp2p/go-libp2p/core/host" peer "github.com/libp2p/go-libp2p/core/peer" ) +var raftLogger = log.Logger("raft-cluster") + // ErrWaitingForSelf is returned when we are waiting for ourselves to depart // the peer set, which won't happen var errWaitingForSelf = errors.New("waiting for ourselves to depart") @@ -49,8 +56,9 @@ type raftWrapper struct { snapshotStore hraft.SnapshotStore logStore hraft.LogStore stableStore hraft.StableStore - //boltdb *raftboltdb.BoltStore - staging bool + boltdb *raftboltdb.BoltStore + repo repo.LockedRepo + staging bool } // newRaftWrapper creates a Raft instance and initializes @@ -60,6 +68,7 @@ func newRaftWrapper( host host.Host, cfg *ClusterRaftConfig, fsm hraft.FSM, + repo repo.LockedRepo, staging bool, ) (*raftWrapper, error) { @@ -67,18 +76,19 @@ func newRaftWrapper( raftW.config = cfg raftW.host = host raftW.staging = staging + raftW.repo = repo // Set correct LocalID cfg.RaftConfig.LocalID = hraft.ServerID(peer.Encode(host.ID())) - //df := cfg.GetDataFolder() - //err := makeDataFolder(df) - //if err != nil { - // return nil, err - //} + df := cfg.GetDataFolder(repo) + err := makeDataFolder(df) + if err != nil { + return nil, err + } raftW.makeServerConfig() - err := raftW.makeTransport() + err = raftW.makeTransport() if err != nil { return nil, err } @@ -124,13 +134,13 @@ func (rw *raftWrapper) makeTransport() (err error) { } func (rw *raftWrapper) makeStores() error { - //logger.Debug("creating BoltDB store") - //df := rw.config.GetDataFolder() - //store, err := raftboltdb.NewBoltStore(filepath.Join(df, "raft.db")) - //if err != nil { - // return err - //} - store := hraft.NewInmemStore() + logger.Debug("creating BoltDB store") + df := rw.config.GetDataFolder(rw.repo) + store, err := raftboltdb.NewBoltStore(filepath.Join(df, "raft.db")) + if err != nil { + return err + } + //store := hraft.NewInmemStore() // wraps the store in a LogCache to improve performance. // See consul/agent/consul/server.go cacheStore, err := hraft.NewLogCache(RaftLogCacheSize, store) @@ -138,22 +148,22 @@ func (rw *raftWrapper) makeStores() error { return err } - //logger.Debug("creating raft snapshot store") - //snapstore, err := hraft.NewFileSnapshotStoreWithLogger( - // df, - // RaftMaxSnapshots, - // raftStdLogger, - //) + logger.Debug("creating raft snapshot store") + snapstore, err := hraft.NewFileSnapshotStoreWithLogger( + df, + RaftMaxSnapshots, + zap.NewStdLog(log.Logger("raft-snapshot").SugaredLogger.Desugar()), + ) + if err != nil { + return err + } - snapstore := hraft.NewInmemSnapshotStore() - //if err != nil { - // return err - //} + //snapstore := hraft.NewInmemSnapshotStore() rw.logStore = cacheStore rw.stableStore = store rw.snapshotStore = snapstore - //rw.boltdb = store + rw.boltdb = store return nil } @@ -420,10 +430,10 @@ func (rw *raftWrapper) Shutdown(ctx context.Context) error { errMsgs += "could not shutdown raft: " + err.Error() + ".\n" } - //err = rw.boltdb.Close() // important! - //if err != nil { - // errMsgs += "could not close boltdb: " + err.Error() - //} + err = rw.boltdb.Close() // important! + if err != nil { + errMsgs += "could not close boltdb: " + err.Error() + } if errMsgs != "" { return errors.New(errMsgs)