Add persistent stores for cluster raft data
This commit is contained in:
parent
f89a682d98
commit
b8060cd8f7
6
go.mod
6
go.mod
@ -69,6 +69,7 @@ require (
|
||||
github.com/hashicorp/go-multierror v1.1.1
|
||||
github.com/hashicorp/golang-lru v0.5.4
|
||||
github.com/hashicorp/raft v1.1.1
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
|
||||
github.com/icza/backscanner v0.0.0-20210726202459-ac2ffc679f94
|
||||
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab
|
||||
github.com/ipfs/bbloom v0.0.4
|
||||
@ -148,9 +149,11 @@ require (
|
||||
go.opentelemetry.io/otel/bridge/opencensus v0.25.0
|
||||
go.opentelemetry.io/otel/exporters/jaeger v1.2.0
|
||||
go.opentelemetry.io/otel/sdk v1.2.0
|
||||
go.uber.org/atomic v1.10.0
|
||||
go.uber.org/fx v1.15.0
|
||||
go.uber.org/multierr v1.8.0
|
||||
go.uber.org/zap v1.22.0
|
||||
golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5
|
||||
golang.org/x/net v0.0.0-20220812174116-3211cb980234
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab
|
||||
@ -173,6 +176,7 @@ require (
|
||||
github.com/benbjohnson/clock v1.3.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/bep/debounce v1.2.0 // indirect
|
||||
github.com/boltdb/bolt v1.3.1 // indirect
|
||||
github.com/cespare/xxhash v1.1.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/cheekybits/genny v1.0.0 // indirect
|
||||
@ -325,11 +329,9 @@ require (
|
||||
go.opentelemetry.io/otel/metric v0.25.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk/export/metric v0.25.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.7.0 // indirect
|
||||
go.uber.org/atomic v1.10.0 // indirect
|
||||
go.uber.org/dig v1.12.0 // indirect
|
||||
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
|
||||
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
|
||||
golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5 // indirect
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
|
2
go.sum
2
go.sum
@ -125,6 +125,7 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
|
||||
github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo=
|
||||
github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0=
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
|
||||
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
|
||||
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
|
||||
github.com/briandowns/spinner v1.11.1/go.mod h1:QOuQk7x+EaDASo80FEXwlwiA+j/PPIcX3FScO+3/ZPQ=
|
||||
@ -636,6 +637,7 @@ github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0m
|
||||
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
|
||||
github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs=
|
||||
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
|
||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
|
||||
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
|
||||
github.com/hodgesds/perf-utils v0.0.8/go.mod h1:F6TfvsbtrF88i++hou29dTXlI2sfsJv+gRZDtmTJkAs=
|
||||
|
@ -1,7 +1,9 @@
|
||||
package consensus
|
||||
|
||||
import (
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
hraft "github.com/hashicorp/raft"
|
||||
@ -18,7 +20,7 @@ var envConfigKey = "cluster_raft"
|
||||
|
||||
// Configuration defaults
|
||||
var (
|
||||
DefaultDataSubFolder = "raft"
|
||||
DefaultDataSubFolder = "raft-cluster"
|
||||
DefaultWaitForLeaderTimeout = 15 * time.Second
|
||||
DefaultCommitRetries = 1
|
||||
DefaultNetworkTimeout = 100 * time.Second
|
||||
@ -350,13 +352,14 @@ func ValidateConfig(cfg *ClusterRaftConfig) error {
|
||||
// return cfg.applyJSONConfig(jcfg)
|
||||
//}
|
||||
//
|
||||
//// GetDataFolder returns the Raft data folder that we are using.
|
||||
//func (cfg *Config) GetDataFolder() string {
|
||||
// if cfg.DataFolder == "" {
|
||||
// return filepath.Join(cfg.BaseDir, DefaultDataSubFolder)
|
||||
// }
|
||||
// return cfg.DataFolder
|
||||
//}
|
||||
// GetDataFolder returns the Raft data folder that we are using.
|
||||
func (cfg *ClusterRaftConfig) GetDataFolder(repo repo.LockedRepo) string {
|
||||
if cfg.DataFolder == "" {
|
||||
return filepath.Join(repo.Path() + DefaultDataSubFolder)
|
||||
}
|
||||
return filepath.Join(repo.Path() + cfg.DataFolder)
|
||||
}
|
||||
|
||||
//
|
||||
//// ToDisplayJSON returns JSON config as a string.
|
||||
//func (cfg *Config) ToDisplayJSON() ([]byte, error) {
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
@ -83,6 +84,7 @@ type Consensus struct {
|
||||
readyCh chan struct{}
|
||||
|
||||
peerSet []peer.ID
|
||||
repo repo.LockedRepo
|
||||
|
||||
//shutdownLock sync.RWMutex
|
||||
//shutdown bool
|
||||
@ -96,7 +98,7 @@ type Consensus struct {
|
||||
//
|
||||
// The staging parameter controls if the Raft peer should start in
|
||||
// staging mode (used when joining a new Raft peerset with other peers).
|
||||
func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.MessagePool, staging bool) (*Consensus, error) {
|
||||
func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.MessagePool, repo repo.LockedRepo, staging bool) (*Consensus, error) {
|
||||
err := ValidateConfig(cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -109,7 +111,7 @@ func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.Mes
|
||||
|
||||
consensus := libp2praft.NewOpLog(state, &ConsensusOp{})
|
||||
|
||||
raft, err := newRaftWrapper(host, cfg, consensus.FSM(), staging)
|
||||
raft, err := newRaftWrapper(host, cfg, consensus.FSM(), repo, staging)
|
||||
if err != nil {
|
||||
logger.Error("error creating raft: ", err)
|
||||
cancel()
|
||||
@ -130,6 +132,7 @@ func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.Mes
|
||||
peerSet: cfg.InitPeerset,
|
||||
rpcReady: make(chan struct{}, 1),
|
||||
readyCh: make(chan struct{}, 1),
|
||||
repo: repo,
|
||||
}
|
||||
|
||||
go cc.finishBootstrap()
|
||||
@ -141,10 +144,11 @@ func NewConsensusWithRPCClient(staging bool) func(host host.Host,
|
||||
cfg *ClusterRaftConfig,
|
||||
rpcClient *rpc.Client,
|
||||
mpool *messagepool.MessagePool,
|
||||
repo repo.LockedRepo,
|
||||
) (*Consensus, error) {
|
||||
|
||||
return func(host host.Host, cfg *ClusterRaftConfig, rpcClient *rpc.Client, mpool *messagepool.MessagePool) (*Consensus, error) {
|
||||
cc, err := NewConsensus(host, cfg, mpool, staging)
|
||||
return func(host host.Host, cfg *ClusterRaftConfig, rpcClient *rpc.Client, mpool *messagepool.MessagePool, repo repo.LockedRepo) (*Consensus, error) {
|
||||
cc, err := NewConsensus(host, cfg, mpool, repo, staging)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -4,16 +4,23 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/ipfs/go-log/v2"
|
||||
"go.uber.org/zap"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
hraft "github.com/hashicorp/raft"
|
||||
raftboltdb "github.com/hashicorp/raft-boltdb"
|
||||
p2praft "github.com/libp2p/go-libp2p-raft"
|
||||
host "github.com/libp2p/go-libp2p/core/host"
|
||||
peer "github.com/libp2p/go-libp2p/core/peer"
|
||||
)
|
||||
|
||||
var raftLogger = log.Logger("raft-cluster")
|
||||
|
||||
// ErrWaitingForSelf is returned when we are waiting for ourselves to depart
|
||||
// the peer set, which won't happen
|
||||
var errWaitingForSelf = errors.New("waiting for ourselves to depart")
|
||||
@ -49,8 +56,9 @@ type raftWrapper struct {
|
||||
snapshotStore hraft.SnapshotStore
|
||||
logStore hraft.LogStore
|
||||
stableStore hraft.StableStore
|
||||
//boltdb *raftboltdb.BoltStore
|
||||
staging bool
|
||||
boltdb *raftboltdb.BoltStore
|
||||
repo repo.LockedRepo
|
||||
staging bool
|
||||
}
|
||||
|
||||
// newRaftWrapper creates a Raft instance and initializes
|
||||
@ -60,6 +68,7 @@ func newRaftWrapper(
|
||||
host host.Host,
|
||||
cfg *ClusterRaftConfig,
|
||||
fsm hraft.FSM,
|
||||
repo repo.LockedRepo,
|
||||
staging bool,
|
||||
) (*raftWrapper, error) {
|
||||
|
||||
@ -67,18 +76,19 @@ func newRaftWrapper(
|
||||
raftW.config = cfg
|
||||
raftW.host = host
|
||||
raftW.staging = staging
|
||||
raftW.repo = repo
|
||||
// Set correct LocalID
|
||||
cfg.RaftConfig.LocalID = hraft.ServerID(peer.Encode(host.ID()))
|
||||
|
||||
//df := cfg.GetDataFolder()
|
||||
//err := makeDataFolder(df)
|
||||
//if err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
df := cfg.GetDataFolder(repo)
|
||||
err := makeDataFolder(df)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
raftW.makeServerConfig()
|
||||
|
||||
err := raftW.makeTransport()
|
||||
err = raftW.makeTransport()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -124,13 +134,13 @@ func (rw *raftWrapper) makeTransport() (err error) {
|
||||
}
|
||||
|
||||
func (rw *raftWrapper) makeStores() error {
|
||||
//logger.Debug("creating BoltDB store")
|
||||
//df := rw.config.GetDataFolder()
|
||||
//store, err := raftboltdb.NewBoltStore(filepath.Join(df, "raft.db"))
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
store := hraft.NewInmemStore()
|
||||
logger.Debug("creating BoltDB store")
|
||||
df := rw.config.GetDataFolder(rw.repo)
|
||||
store, err := raftboltdb.NewBoltStore(filepath.Join(df, "raft.db"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
//store := hraft.NewInmemStore()
|
||||
// wraps the store in a LogCache to improve performance.
|
||||
// See consul/agent/consul/server.go
|
||||
cacheStore, err := hraft.NewLogCache(RaftLogCacheSize, store)
|
||||
@ -138,22 +148,22 @@ func (rw *raftWrapper) makeStores() error {
|
||||
return err
|
||||
}
|
||||
|
||||
//logger.Debug("creating raft snapshot store")
|
||||
//snapstore, err := hraft.NewFileSnapshotStoreWithLogger(
|
||||
// df,
|
||||
// RaftMaxSnapshots,
|
||||
// raftStdLogger,
|
||||
//)
|
||||
logger.Debug("creating raft snapshot store")
|
||||
snapstore, err := hraft.NewFileSnapshotStoreWithLogger(
|
||||
df,
|
||||
RaftMaxSnapshots,
|
||||
zap.NewStdLog(log.Logger("raft-snapshot").SugaredLogger.Desugar()),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
snapstore := hraft.NewInmemSnapshotStore()
|
||||
//if err != nil {
|
||||
// return err
|
||||
//}
|
||||
//snapstore := hraft.NewInmemSnapshotStore()
|
||||
|
||||
rw.logStore = cacheStore
|
||||
rw.stableStore = store
|
||||
rw.snapshotStore = snapstore
|
||||
//rw.boltdb = store
|
||||
rw.boltdb = store
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -420,10 +430,10 @@ func (rw *raftWrapper) Shutdown(ctx context.Context) error {
|
||||
errMsgs += "could not shutdown raft: " + err.Error() + ".\n"
|
||||
}
|
||||
|
||||
//err = rw.boltdb.Close() // important!
|
||||
//if err != nil {
|
||||
// errMsgs += "could not close boltdb: " + err.Error()
|
||||
//}
|
||||
err = rw.boltdb.Close() // important!
|
||||
if err != nil {
|
||||
errMsgs += "could not close boltdb: " + err.Error()
|
||||
}
|
||||
|
||||
if errMsgs != "" {
|
||||
return errors.New(errMsgs)
|
||||
|
Loading…
Reference in New Issue
Block a user