This commit is contained in:
Shrenuj Bansal 2022-09-27 16:08:04 +00:00
parent 7470549199
commit 99e7c322eb
21 changed files with 650 additions and 353 deletions

View File

@ -8,7 +8,6 @@ import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
consensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/filecoin-project/go-address"
@ -30,6 +29,7 @@ import (
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/builtin/power"
"github.com/filecoin-project/lotus/chain/types"
consensus2 "github.com/filecoin-project/lotus/lib/consensus/raft"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports"
)
@ -753,8 +753,8 @@ type FullNode interface {
// the path specified when calling CreateBackup is within the base path
CreateBackup(ctx context.Context, fpath string) error //perm:admin
RaftState(ctx context.Context) (consensus.State, error) //perm:read
RaftLeader(ctx context.Context) (peer.ID, error) //perm:read
RaftState(ctx context.Context) (*consensus2.RaftState, error) //perm:read
RaftLeader(ctx context.Context) (peer.ID, error) //perm:read
}
type StorageAsk struct {

View File

@ -39,6 +39,7 @@ import (
"github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
consensus "github.com/filecoin-project/lotus/lib/consensus/raft"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
@ -339,6 +340,10 @@ func init() {
addExample(map[string]bitfield.BitField{
"": bitfield.NewFromSet([]uint64{5, 6, 7, 10}),
})
addExample(&consensus.RaftState{
NonceMap: make(map[address.Address]uint64),
MsgUuids: make(map[uuid.UUID]*types.SignedMessage),
})
}

View File

@ -14,7 +14,6 @@ import (
uuid "github.com/google/uuid"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
consensus "github.com/libp2p/go-libp2p-consensus"
metrics "github.com/libp2p/go-libp2p/core/metrics"
network0 "github.com/libp2p/go-libp2p/core/network"
peer "github.com/libp2p/go-libp2p/core/peer"
@ -38,6 +37,7 @@ import (
miner0 "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
types "github.com/filecoin-project/lotus/chain/types"
alerting "github.com/filecoin-project/lotus/journal/alerting"
consensus "github.com/filecoin-project/lotus/lib/consensus/raft"
dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
imports "github.com/filecoin-project/lotus/node/repo/imports"
)
@ -2260,10 +2260,10 @@ func (mr *MockFullNodeMockRecorder) RaftLeader(arg0 interface{}) *gomock.Call {
}
// RaftState mocks base method.
func (m *MockFullNode) RaftState(arg0 context.Context) (consensus.State, error) {
func (m *MockFullNode) RaftState(arg0 context.Context) (*consensus.RaftState, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RaftState", arg0)
ret0, _ := ret[0].(consensus.State)
ret0, _ := ret[0].(*consensus.RaftState)
ret1, _ := ret[1].(error)
return ret0, ret1
}

View File

@ -10,7 +10,6 @@ import (
"github.com/google/uuid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
consensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
@ -37,6 +36,7 @@ import (
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal/alerting"
consensus2 "github.com/filecoin-project/lotus/lib/consensus/raft"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/imports"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
@ -342,7 +342,7 @@ type FullNodeStruct struct {
RaftLeader func(p0 context.Context) (peer.ID, error) `perm:"read"`
RaftState func(p0 context.Context) (consensus.State, error) `perm:"read"`
RaftState func(p0 context.Context) (*consensus2.RaftState, error) `perm:"read"`
StateAccountKey func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) `perm:"read"`
@ -2457,15 +2457,15 @@ func (s *FullNodeStub) RaftLeader(p0 context.Context) (peer.ID, error) {
return *new(peer.ID), ErrNotSupported
}
func (s *FullNodeStruct) RaftState(p0 context.Context) (consensus.State, error) {
func (s *FullNodeStruct) RaftState(p0 context.Context) (*consensus2.RaftState, error) {
if s.Internal.RaftState == nil {
return *new(consensus.State), ErrNotSupported
return nil, ErrNotSupported
}
return s.Internal.RaftState(p0)
}
func (s *FullNodeStub) RaftState(p0 context.Context) (consensus.State, error) {
return *new(consensus.State), ErrNotSupported
func (s *FullNodeStub) RaftState(p0 context.Context) (*consensus2.RaftState, error) {
return nil, ErrNotSupported
}
func (s *FullNodeStruct) StateAccountKey(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) {

Binary file not shown.

View File

@ -9,8 +9,6 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
consensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p/core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
@ -38,10 +36,10 @@ type MsgSigner interface {
NextNonce(ctx context.Context, addr address.Address) (uint64, error)
SaveNonce(ctx context.Context, addr address.Address, nonce uint64) error
dstoreKey(addr address.Address) datastore.Key
IsLeader(ctx context.Context) bool
Leader(ctx context.Context) (peer.ID, error)
RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error)
GetRaftState(ctx context.Context) (consensus.State, error)
//IsLeader(ctx context.Context) bool
//Leader(ctx context.Context) (peer.ID, error)
//RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error)
//GetRaftState(ctx context.Context) (consensus.State, error)
}
// MessageSigner keeps track of nonces per address, and increments the nonce
@ -200,18 +198,18 @@ func (ms *MessageSigner) dstoreKey(addr address.Address) datastore.Key {
return datastore.KeyWithNamespaces([]string{dsKeyActorNonce, addr.String()})
}
func (ms *MessageSigner) IsLeader(ctx context.Context) bool {
return true
}
func (ms *MessageSigner) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) {
return false, xerrors.Errorf("single node shouldn't have any redirects")
}
func (ms *MessageSigner) GetRaftState(ctx context.Context) (consensus.State, error) {
return nil, xerrors.Errorf("this is a non raft consensus message signer")
}
func (ms *MessageSigner) Leader(ctx context.Context) (peer.ID, error) {
return "", xerrors.Errorf("no leaders in non raft message signer")
}
//func (ms *MessageSigner) IsLeader(ctx context.Context) bool {
// return true
//}
//
//func (ms *MessageSigner) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) {
// return false, xerrors.Errorf("single node shouldn't have any redirects")
//}
//
//func (ms *MessageSigner) GetRaftState(ctx context.Context) (consensus.State, error) {
// return nil, xerrors.Errorf("this is a non raft consensus message signer")
//}
//
//func (ms *MessageSigner) Leader(ctx context.Context) (peer.ID, error) {
// return "", xerrors.Errorf("no leaders in non raft message signer")
//}

View File

@ -6,7 +6,6 @@ import (
"github.com/google/uuid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
libp2pconsensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p/core/peer"
"golang.org/x/xerrors"
@ -77,12 +76,12 @@ func (ms *MessageSignerConsensus) SignMessage(
}
func (ms *MessageSignerConsensus) GetSignedMessage(ctx context.Context, uuid uuid.UUID) (*types.SignedMessage, error) {
state, err := ms.consensus.State(ctx)
cstate, err := ms.consensus.State(ctx)
if err != nil {
return nil, err
}
cstate := state.(consensus.RaftState)
//cstate := state.(consensus.RaftState)
msg, ok := cstate.MsgUuids[uuid]
if !ok {
return nil, xerrors.Errorf("Msg with Uuid %s not available", uuid)
@ -90,7 +89,7 @@ func (ms *MessageSignerConsensus) GetSignedMessage(ctx context.Context, uuid uui
return msg, nil
}
func (ms *MessageSignerConsensus) GetRaftState(ctx context.Context) (libp2pconsensus.State, error) {
func (ms *MessageSignerConsensus) GetRaftState(ctx context.Context) (*consensus.RaftState, error) {
return ms.consensus.State(ctx)
}

View File

@ -8,6 +8,7 @@ import (
"net/url"
"os"
"os/signal"
"reflect"
"strings"
"syscall"
@ -36,7 +37,7 @@ const (
// 2. *_API_INFO environment variables
// 3. deprecated *_API_INFO environment variables
// 4. *-repo command line flags.
func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) {
func GetAPIInfoMulti(ctx *cli.Context, t repo.RepoType) ([]APIInfo, error) {
// Check if there was a flag passed with the listen address of the API
// server (only used by the tests)
for _, f := range t.APIFlags() {
@ -46,7 +47,7 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) {
strma := ctx.String(f)
strma = strings.TrimSpace(strma)
return APIInfo{Addr: strma}, nil
return []APIInfo{APIInfo{Addr: strma}}, nil
}
//
@ -56,14 +57,14 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) {
primaryEnv, fallbacksEnvs, deprecatedEnvs := t.APIInfoEnvVars()
env, ok := os.LookupEnv(primaryEnv)
if ok {
return ParseApiInfo(env), nil
return ParseApiInfoMulti(env), nil
}
for _, env := range deprecatedEnvs {
env, ok := os.LookupEnv(env)
if ok {
log.Warnf("Using deprecated env(%s) value, please use env(%s) instead.", env, primaryEnv)
return ParseApiInfo(env), nil
return ParseApiInfoMulti(env), nil
}
}
@ -76,26 +77,26 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) {
p, err := homedir.Expand(path)
if err != nil {
return APIInfo{}, xerrors.Errorf("could not expand home dir (%s): %w", f, err)
return []APIInfo{}, xerrors.Errorf("could not expand home dir (%s): %w", f, err)
}
r, err := repo.NewFS(p)
if err != nil {
return APIInfo{}, xerrors.Errorf("could not open repo at path: %s; %w", p, err)
return []APIInfo{}, xerrors.Errorf("could not open repo at path: %s; %w", p, err)
}
exists, err := r.Exists()
if err != nil {
return APIInfo{}, xerrors.Errorf("repo.Exists returned an error: %w", err)
return []APIInfo{}, xerrors.Errorf("repo.Exists returned an error: %w", err)
}
if !exists {
return APIInfo{}, errors.New("repo directory does not exist. Make sure your configuration is correct")
return []APIInfo{}, errors.New("repo directory does not exist. Make sure your configuration is correct")
}
ma, err := r.APIEndpoint()
if err != nil {
return APIInfo{}, xerrors.Errorf("could not get api endpoint: %w", err)
return []APIInfo{}, xerrors.Errorf("could not get api endpoint: %w", err)
}
token, err := r.APIToken()
@ -103,38 +104,80 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) {
log.Warnf("Couldn't load CLI token, capabilities may be limited: %v", err)
}
return APIInfo{
return []APIInfo{APIInfo{
Addr: ma.String(),
Token: token,
}, nil
}}, nil
}
for _, env := range fallbacksEnvs {
env, ok := os.LookupEnv(env)
if ok {
return ParseApiInfo(env), nil
return ParseApiInfoMulti(env), nil
}
}
return APIInfo{}, fmt.Errorf("could not determine API endpoint for node type: %v", t.Type())
return []APIInfo{}, fmt.Errorf("could not determine API endpoint for node type: %v", t.Type())
}
func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) {
ainfos, err := GetAPIInfoMulti(ctx, t)
if err != nil {
return APIInfo{}, err
}
if len(ainfos) > 1 {
log.Warn("multiple API infos received when only one was expected")
}
return ainfos[0], nil
}
type HttpHead struct {
addr string
header http.Header
}
func GetRawAPIMulti(ctx *cli.Context, t repo.RepoType, version string) ([]HttpHead, error) {
var httpHeads []HttpHead
ainfos, err := GetAPIInfoMulti(ctx, t)
if err != nil {
return httpHeads, xerrors.Errorf("could not get API info for %s: %w", t.Type(), err)
}
for _, ainfo := range ainfos {
addr, err := ainfo.DialArgs(version)
if err != nil {
return httpHeads, xerrors.Errorf("could not get DialArgs: %w", err)
}
httpHeads = append(httpHeads, HttpHead{addr: addr, header: ainfo.AuthHeader()})
}
//addr, err := ainfo.DialArgs(version)
//if err != nil {
// return "", nil, xerrors.Errorf("could not get DialArgs: %w", err)
//}
if IsVeryVerbose {
_, _ = fmt.Fprintf(ctx.App.Writer, "using raw API %s endpoint: %s\n", version, httpHeads[0].addr)
}
return httpHeads, nil
}
func GetRawAPI(ctx *cli.Context, t repo.RepoType, version string) (string, http.Header, error) {
ainfo, err := GetAPIInfo(ctx, t)
heads, err := GetRawAPIMulti(ctx, t, version)
if err != nil {
return "", nil, xerrors.Errorf("could not get API info for %s: %w", t.Type(), err)
return "", nil, err
}
addr, err := ainfo.DialArgs(version)
if err != nil {
return "", nil, xerrors.Errorf("could not get DialArgs: %w", err)
if len(heads) > 1 {
log.Warnf("More than 1 header received when expecting only one")
}
if IsVeryVerbose {
_, _ = fmt.Fprintf(ctx.App.Writer, "using raw API %s endpoint: %s\n", version, addr)
}
return addr, ainfo.AuthHeader(), nil
return heads[0].addr, heads[0].header, nil
}
func GetCommonAPI(ctx *cli.Context) (api.CommonNet, jsonrpc.ClientCloser, error) {
@ -185,6 +228,76 @@ func GetFullNodeAPI(ctx *cli.Context) (v0api.FullNode, jsonrpc.ClientCloser, err
return client.NewFullNodeRPCV0(ctx.Context, addr, headers)
}
func RouteRequest() {
}
func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
outs := api.GetInternalStructs(outstr)
var rins []reflect.Value
//peertoNode := make(map[peer.ID]reflect.Value)
for _, in := range ins {
rin := reflect.ValueOf(in)
rins = append(rins, rin)
//peertoNode[ins] = rin
}
for _, out := range outs {
rint := reflect.ValueOf(out).Elem()
//ra := reflect.ValueOf(in)
for f := 0; f < rint.NumField(); f++ {
field := rint.Type().Field(f)
var fns []reflect.Value
for _, rin := range rins {
fns = append(fns, rin.MethodByName(field.Name))
}
//fn := ra.MethodByName(field.Name)
//curr := 0
//total := len(rins)
//retryFunc := func(args []reflect.Value) (results []reflect.Value) {
// //ctx := args[0].Interface().(context.Context)
// //
// //rin := peertoNode[ins[0].Leader(ctx)]
// //fn := rin.MethodByName(field.Name)
// //
// //return fn.Call(args)
//
// toCall := curr
// curr += 1 % total
// return fns[toCall].Call(args)
//}
rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
//errorsToRetry := []error{&jsonrpc.RPCConnectionError{}}
//initialBackoff, err := time.ParseDuration("1s")
//if err != nil {
// return nil
//}
//result, err := retry.Retry(5, initialBackoff, errorsToRetry, func() (results []reflect.Value, err2 error) {
// //ctx := args[0].Interface().(context.Context)
// //
// //rin := peertoNode[ins[0].Leader(ctx)]
// //fn := rin.MethodByName(field.Name)
// //
// //return fn.Call(args)
//
// toCall := curr
// curr += 1 % total
// result := fns[toCall].Call(args)
// return result, results[len(results)-1].Interface().(error)
//})
//return result
return fns[0].Call(args)
}))
}
}
}
func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) {
if tn, ok := ctx.App.Metadata["testnode-full"]; ok {
return tn.(v1api.FullNode), func() {}, nil
@ -214,6 +327,51 @@ func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, e
return v1API, closer, nil
}
func GetFullNodeAPIV1New(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) {
if tn, ok := ctx.App.Metadata["testnode-full"]; ok {
return tn.(v1api.FullNode), func() {}, nil
}
heads, err := GetRawAPIMulti(ctx, repo.FullNode, "v1")
if err != nil {
return nil, nil, err
}
if IsVeryVerbose {
_, _ = fmt.Fprintln(ctx.App.Writer, "using full node API v1 endpoint:", heads[0].addr)
}
var fullNodes []api.FullNode
var closers []jsonrpc.ClientCloser
for _, head := range heads {
v1api, closer, err := client.NewFullNodeRPCV1(ctx.Context, head.addr, head.header)
if err != nil {
return nil, nil, err
}
fullNodes = append(fullNodes, v1api)
closers = append(closers, closer)
}
finalCloser := func() {
for _, c := range closers {
c()
}
}
var v1API api.FullNodeStruct
FullNodeProxy(fullNodes, &v1API)
v, err := v1API.Version(ctx.Context)
if err != nil {
return nil, nil, err
}
if !v.APIVersion.EqMajorMinor(api.FullAPIVersion1) {
return nil, nil, xerrors.Errorf("Remote API version didn't match (expected %s, remote %s)", api.FullAPIVersion1, v.APIVersion)
}
return &v1API, finalCloser, nil
}
type GetStorageMinerOptions struct {
PreferHttp bool
}

View File

@ -24,6 +24,7 @@ type APIInfo struct {
func ParseApiInfo(s string) APIInfo {
var tok []byte
if infoWithToken.Match([]byte(s)) {
sp := strings.SplitN(s, ":", 2)
tok = []byte(sp[0])
@ -36,6 +37,31 @@ func ParseApiInfo(s string) APIInfo {
}
}
func ParseApiInfoMulti(s string) []APIInfo {
var apiInfos []APIInfo
if infoWithToken.Match([]byte(s)) {
allAddrs := strings.SplitN(s, ",", -1)
for _, addr := range allAddrs {
sp := strings.SplitN(addr, ":", 2)
//tok = []byte(sp[0])
//s = sp[1]
apiInfos = append(apiInfos, APIInfo{
Addr: sp[1],
Token: []byte(sp[0]),
})
}
}
//return APIInfo{
// Addr: s,
// Token: tok,
//}
return apiInfos
}
func (a APIInfo) DialArgs(version string) (string, error) {
ma, err := multiaddr.NewMultiaddr(a.Addr)
if err == nil {

View File

@ -5069,7 +5069,13 @@ Perms: read
Inputs: `null`
Response: `{}`
Response:
```json
{
"NonceMap": {},
"MsgUuids": {}
}
```
## State
The State methods are used to query, inspect, and interact with chain state.

View File

@ -1,228 +0,0 @@
[API]
# Binding address for the Lotus API
#
# type: string
# env var: LOTUS_API_LISTENADDRESS
#ListenAddress = "/ip4/127.0.0.1/tcp/1234/http"
# type: string
# env var: LOTUS_API_REMOTELISTENADDRESS
#RemoteListenAddress = ""
# type: Duration
# env var: LOTUS_API_TIMEOUT
#Timeout = "30s"
[Backup]
# When set to true disables metadata log (.lotus/kvlog). This can save disk
# space by reducing metadata redundancy.
#
# Note that in case of metadata corruption it might be much harder to recover
# your node if metadata log is disabled
#
# type: bool
# env var: LOTUS_BACKUP_DISABLEMETADATALOG
#DisableMetadataLog = true
[Logging]
[Logging.SubsystemLevels]
# env var: LOTUS_LOGGING_SUBSYSTEMLEVELS_EXAMPLE-SUBSYSTEM
#example-subsystem = "INFO"
[Libp2p]
# Binding address for the libp2p host - 0 means random port.
# Format: multiaddress; see https://multiformats.io/multiaddr/
#
# type: []string
# env var: LOTUS_LIBP2P_LISTENADDRESSES
#ListenAddresses = ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0"]
# Addresses to explicitally announce to other peers. If not specified,
# all interface addresses are announced
# Format: multiaddress
#
# type: []string
# env var: LOTUS_LIBP2P_ANNOUNCEADDRESSES
#AnnounceAddresses = []
# Addresses to not announce
# Format: multiaddress
#
# type: []string
# env var: LOTUS_LIBP2P_NOANNOUNCEADDRESSES
#NoAnnounceAddresses = []
# When not disabled (default), lotus asks NAT devices (e.g., routers), to
# open up an external port and forward it to the port lotus is running on.
# When this works (i.e., when your router supports NAT port forwarding),
# it makes the local lotus node accessible from the public internet
#
# type: bool
# env var: LOTUS_LIBP2P_DISABLENATPORTMAP
#DisableNatPortMap = false
# ConnMgrLow is the number of connections that the basic connection manager
# will trim down to.
#
# type: uint
# env var: LOTUS_LIBP2P_CONNMGRLOW
#ConnMgrLow = 150
# ConnMgrHigh is the number of connections that, when exceeded, will trigger
# a connection GC operation. Note: protected/recently formed connections don't
# count towards this limit.
#
# type: uint
# env var: LOTUS_LIBP2P_CONNMGRHIGH
#ConnMgrHigh = 180
# ConnMgrGrace is a time duration that new connections are immune from being
# closed by the connection manager.
#
# type: Duration
# env var: LOTUS_LIBP2P_CONNMGRGRACE
#ConnMgrGrace = "20s"
[Pubsub]
# Run the node in bootstrap-node mode
#
# type: bool
# env var: LOTUS_PUBSUB_BOOTSTRAPPER
#Bootstrapper = false
# type: string
# env var: LOTUS_PUBSUB_REMOTETRACER
#RemoteTracer = ""
[Client]
# type: bool
# env var: LOTUS_CLIENT_USEIPFS
#UseIpfs = false
# type: bool
# env var: LOTUS_CLIENT_IPFSONLINEMODE
#IpfsOnlineMode = false
# type: string
# env var: LOTUS_CLIENT_IPFSMADDR
#IpfsMAddr = ""
# type: bool
# env var: LOTUS_CLIENT_IPFSUSEFORRETRIEVAL
#IpfsUseForRetrieval = false
# The maximum number of simultaneous data transfers between the client
# and storage providers for storage deals
#
# type: uint64
# env var: LOTUS_CLIENT_SIMULTANEOUSTRANSFERSFORSTORAGE
#SimultaneousTransfersForStorage = 20
# The maximum number of simultaneous data transfers between the client
# and storage providers for retrieval deals
#
# type: uint64
# env var: LOTUS_CLIENT_SIMULTANEOUSTRANSFERSFORRETRIEVAL
#SimultaneousTransfersForRetrieval = 20
# Require that retrievals perform no on-chain operations. Paid retrievals
# without existing payment channels with available funds will fail instead
# of automatically performing on-chain operations.
#
# type: bool
# env var: LOTUS_CLIENT_OFFCHAINRETRIEVAL
#OffChainRetrieval = false
[Wallet]
# type: string
# env var: LOTUS_WALLET_REMOTEBACKEND
#RemoteBackend = ""
# type: bool
# env var: LOTUS_WALLET_ENABLELEDGER
#EnableLedger = false
# type: bool
# env var: LOTUS_WALLET_DISABLELOCAL
#DisableLocal = false
[Fees]
# type: types.FIL
# env var: LOTUS_FEES_DEFAULTMAXFEE
#DefaultMaxFee = "0.07 FIL"
[Chainstore]
# type: bool
# env var: LOTUS_CHAINSTORE_ENABLESPLITSTORE
#EnableSplitstore = false
[Chainstore.Splitstore]
# ColdStoreType specifies the type of the coldstore.
# It can be "universal" (default) or "discard" for discarding cold blocks.
#
# type: string
# env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTORETYPE
#ColdStoreType = "universal"
# HotStoreType specifies the type of the hotstore.
# Only currently supported value is "badger".
#
# type: string
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTORETYPE
#HotStoreType = "badger"
# MarkSetType specifies the type of the markset.
# It can be "map" for in memory marking or "badger" (default) for on-disk marking.
#
# type: string
# env var: LOTUS_CHAINSTORE_SPLITSTORE_MARKSETTYPE
#MarkSetType = "badger"
# HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond
# the compaction boundary; default is 0.
#
# type: uint64
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREMESSAGERETENTION
#HotStoreMessageRetention = 0
# HotStoreFullGCFrequency specifies how often to perform a full (moving) GC on the hotstore.
# A value of 0 disables, while a value 1 will do full GC in every compaction.
# Default is 20 (about once a week).
#
# type: uint64
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREFULLGCFREQUENCY
#HotStoreFullGCFrequency = 20
# EnableColdStoreAutoPrune turns on compaction of the cold store i.e. pruning
# where hotstore compaction occurs every finality epochs pruning happens every 3 finalities
# Default is false
#
# type: bool
# env var: LOTUS_CHAINSTORE_SPLITSTORE_ENABLECOLDSTOREAUTOPRUNE
#EnableColdStoreAutoPrune = false
# ColdStoreFullGCFrequency specifies how often to performa a full (moving) GC on the coldstore.
# Only applies if auto prune is enabled. A value of 0 disables while a value of 1 will do
# full GC in every prune.
# Default is 7 (about once every a week)
#
# type: uint64
# env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTOREFULLGCFREQUENCY
#ColdStoreFullGCFrequency = 7
# ColdStoreRetention specifies the retention policy for data reachable from the chain, in
# finalities beyond the compaction boundary, default is 0, -1 retains everything
#
# type: int64
# env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTORERETENTION
#ColdStoreRetention = 0

View File

@ -297,7 +297,7 @@ func startNodes(
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
srv, _ := kit.CreateRPCServer(t, handler, l)
srv, _, _ := kit.CreateRPCServer(t, handler, l)
// Create a gateway client API that connects to the gateway server
var gapi api.Gateway

View File

@ -9,6 +9,7 @@ import (
"io/ioutil"
"net"
"net/http"
"reflect"
"sync"
"testing"
"time"
@ -81,7 +82,7 @@ func init() {
//
// var full TestFullNode
// var miner TestMiner
// ens.FullNode(&full, opts...) // populates a full node
// ens.FullNodes(&full, opts...) // populates a full node
// ens.Miner(&miner, &full, opts...) // populates a miner, using the full node as its chain daemon
//
// It is possible to pass functional options to set initial balances,
@ -101,7 +102,7 @@ func init() {
//
// The API is chainable, so it's possible to do a lot in a very succinct way:
//
// kit.NewEnsemble().FullNode(&full).Miner(&miner, &full).Start().InterconnectAll().BeginMining()
// kit.NewEnsemble().FullNodes(&full).Miner(&miner, &full).Start().InterconnectAll().BeginMining()
//
// You can also find convenient fullnode:miner presets, such as 1:1, 1:2,
// and 2:1, e.g.:
@ -318,6 +319,12 @@ func (n *Ensemble) Miner(minerNode *TestMiner, full *TestFullNode, opts ...NodeO
return n
}
//func (n *Ensemble) MinerWithMultipleNodes(minerNode *TestMiner, full []*TestFullNode, opts ...NodeOpt) *Ensemble {
// n.MinerEnroll(minerNode, full, opts...)
// n.AddInactiveMiner(minerNode)
// return n
//}
// Worker enrolls a new worker, using the provided full node for chain
// interactions.
func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...NodeOpt) *Ensemble {
@ -344,6 +351,43 @@ func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...Node
return n
}
func proxy(ins []api.FullNode, outstr *api.FullNodeStruct) {
outs := api.GetInternalStructs(outstr)
var rins []reflect.Value
//peertoNode := make(map[peer.ID]reflect.Value)
for _, in := range ins {
rin := reflect.ValueOf(in)
rins = append(rins, rin)
//peertoNode[ins] = rin
}
for _, out := range outs {
rint := reflect.ValueOf(out).Elem()
//ra := reflect.ValueOf(in)
for f := 0; f < rint.NumField(); f++ {
field := rint.Type().Field(f)
var fns []reflect.Value
for _, rin := range rins {
fns = append(fns, rin.MethodByName(field.Name))
}
//fn := ra.MethodByName(field.Name)
rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
//ctx := args[0].Interface().(context.Context)
//
//rin := peertoNode[ins[0].Leader(ctx)]
//fn := rin.MethodByName(field.Name)
//
//return fn.Call(args)
return fns[0].Call(args)
}))
}
}
}
// Start starts all enrolled nodes.
func (n *Ensemble) Start() *Ensemble {
ctx := context.Background()
@ -463,10 +507,17 @@ func (n *Ensemble) Start() *Ensemble {
err = full.WalletSetDefault(context.Background(), addr)
require.NoError(n.t, err)
var rpcShutdownOnce sync.Once
// Are we hitting this node through its RPC?
if full.options.rpc {
withRPC := fullRpc(n.t, full)
withRPC, rpcCloser := fullRpc(n.t, full)
n.inactive.fullnodes[i] = withRPC
full.Stop = func(ctx2 context.Context) error {
//rpcCloser()
rpcShutdownOnce.Do(rpcCloser)
return stop(ctx)
}
n.t.Cleanup(func() { rpcShutdownOnce.Do(rpcCloser) })
}
n.t.Cleanup(func() {
@ -563,7 +614,7 @@ func (n *Ensemble) Start() *Ensemble {
}
// // Set it as the default address.
// err = m.FullNode.WalletSetDefault(ctx, m.OwnerAddr.Address)
// err = m.FullNodes.WalletSetDefault(ctx, m.OwnerAddr.Address)
// require.NoError(n.t, err)
r := repo.NewMemory(nil)
@ -663,7 +714,21 @@ func (n *Ensemble) Start() *Ensemble {
assigner := m.options.minerAssigner
disallowRemoteFinalize := m.options.disallowRemoteFinalize
//var wrappedFullNode api.FullNodeStruct
//var fullNodes []api.FullNode
//for _, fn := range m.FullNodes {
// fullNodes = append(fullNodes, fn.FullNode)
//}
//proxy(fullNodes, &wrappedFullNode)
var mineBlock = make(chan lotusminer.MineReq)
copy := *m.FullNode
copy.FullNode = modules.MakeUuidWrapper(copy.FullNode)
m.FullNode = &copy
//m.FullNode.FullNode = modules.MakeUuidWrapper(fn.FullNode)
opts := []node.Option{
node.StorageMiner(&m.StorageMiner, cfg.Subsystems),
node.Base(),
@ -671,7 +736,9 @@ func (n *Ensemble) Start() *Ensemble {
node.Test(),
node.If(m.options.disableLibp2p, node.MockHost(n.mn)),
node.Override(new(v1api.RawFullNodeAPI), m.FullNode.FullNode),
//node.Override(new(v1api.RawFullNodeAPI), func() api.FullNode { return modules.MakeUuidWrapper(m.FullNode) }),
//node.Override(new(v1api.RawFullNodeAPI), modules.MakeUuidWrapper),
node.Override(new(v1api.RawFullNodeAPI), m.FullNode),
node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, m.ActorAddr)),
// disable resource filtering so that local worker gets assigned tasks

View File

@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/wallet/key"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/node"
)
@ -44,6 +45,27 @@ type TestFullNode struct {
options nodeOpts
}
func MergeFullNodes(fullNodes []*TestFullNode) *TestFullNode {
var wrappedFullNode TestFullNode
//var fnapis []api.FullNode
//for _, fullNode := range fullNodes {
// fnapis = append(fnapis, fullNode)
//}
var fns api.FullNodeStruct
wrappedFullNode.FullNode = &fns
cliutil.FullNodeProxy(fullNodes, &fns)
wrappedFullNode.t = fullNodes[0].t
wrappedFullNode.ListenAddr = fullNodes[0].ListenAddr
wrappedFullNode.DefaultKey = fullNodes[0].DefaultKey
wrappedFullNode.Stop = fullNodes[0].Stop
wrappedFullNode.options = fullNodes[0].options
return &wrappedFullNode
}
func (f TestFullNode) Shutdown(ctx context.Context) error {
return f.Stop(ctx)
}

View File

@ -18,22 +18,29 @@ import (
"github.com/filecoin-project/lotus/node"
)
func CreateRPCServer(t *testing.T, handler http.Handler, listener net.Listener) (*httptest.Server, multiaddr.Multiaddr) {
type Closer func()
func CreateRPCServer(t *testing.T, handler http.Handler, listener net.Listener) (*httptest.Server, multiaddr.Multiaddr, Closer) {
testServ := &httptest.Server{
Listener: listener,
Config: &http.Server{Handler: handler},
}
testServ.Start()
t.Cleanup(func() {
waitUpTo(testServ.Close, time.Second, "Gave up waiting for RPC server to close after 1s")
})
t.Cleanup(testServ.CloseClientConnections)
//t.Cleanup(func() {
// waitUpTo(testServ.Close, time.Second, "Gave up waiting for RPC server to close after 1s")
//})
//t.Cleanup(testServ.CloseClientConnections)
addr := testServ.Listener.Addr()
maddr, err := manet.FromNetAddr(addr)
require.NoError(t, err)
return testServ, maddr
closer := func() {
testServ.CloseClientConnections()
testServ.Close()
}
return testServ, maddr, closer
}
func waitUpTo(fn func(), waitTime time.Duration, errMsg string) {
@ -51,30 +58,30 @@ func waitUpTo(fn func(), waitTime time.Duration, errMsg string) {
}
}
func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode {
func fullRpc(t *testing.T, f *TestFullNode) (*TestFullNode, Closer) {
handler, err := node.FullNodeHandler(f.FullNode, false)
require.NoError(t, err)
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
srv, maddr := CreateRPCServer(t, handler, l)
srv, maddr, rpcCloser := CreateRPCServer(t, handler, l)
fmt.Printf("FULLNODE RPC ENV FOR CLI DEBUGGING `export FULLNODE_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String())
sendItestdNotif("FULLNODE_API_INFO", t.Name(), "ws://"+srv.Listener.Addr().String())
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil)
require.NoError(t, err)
t.Cleanup(stop)
//t.Cleanup(stop)
f.ListenAddr, f.FullNode = maddr, cl
return f
return f, func() { stop(); rpcCloser() }
}
func minerRpc(t *testing.T, m *TestMiner) *TestMiner {
handler, err := node.MinerHandler(m.StorageMiner, false)
require.NoError(t, err)
srv, maddr := CreateRPCServer(t, handler, m.RemoteListener)
srv, maddr, _ := CreateRPCServer(t, handler, m.RemoteListener)
fmt.Printf("creating RPC server for %s at %s\n", m.ActorAddr, srv.Listener.Addr().String())
fmt.Printf("SP RPC ENV FOR CLI DEBUGGING `export MINER_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String())
@ -92,7 +99,7 @@ func minerRpc(t *testing.T, m *TestMiner) *TestMiner {
func workerRpc(t *testing.T, m *TestWorker) *TestWorker {
handler := sealworker.WorkerHandler(m.MinerNode.AuthVerify, m.FetchHandler, m.Worker, false)
srv, maddr := CreateRPCServer(t, handler, m.RemoteListener)
srv, maddr, _ := CreateRPCServer(t, handler, m.RemoteListener)
fmt.Println("creating RPC server for a worker at: ", srv.Listener.Addr().String())
url := "ws://" + srv.Listener.Addr().String() + "/rpc/v0"

View File

@ -14,6 +14,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/exitcode"
@ -41,16 +42,16 @@ func generatePrivKey() (*kit.Libp2p, error) {
return &kit.Libp2p{PeerID: peerId, PrivKey: privkey}, nil
}
func getRaftState(ctx context.Context, t *testing.T, node *kit.TestFullNode) consensus.RaftState {
func getRaftState(ctx context.Context, t *testing.T, node *kit.TestFullNode) *consensus.RaftState {
raftState, err := node.RaftState(ctx)
require.NoError(t, err)
rstate := raftState.(consensus.RaftState)
return rstate
//rstate := raftState.(*consensus.RaftState)
return raftState
}
func setup(ctx context.Context, t *testing.T, node0 *kit.TestFullNode, node1 *kit.TestFullNode, node2 *kit.TestFullNode, miner *kit.TestMiner) *kit.Ensemble {
blockTime := 5 * time.Millisecond
blockTime := 1000 * time.Millisecond
pkey0, _ := generatePrivKey()
pkey1, _ := generatePrivKey()
@ -72,13 +73,17 @@ func setup(ctx context.Context, t *testing.T, node0 *kit.TestFullNode, node1 *ki
node.Override(node.GoRPCServer, modules.NewRPCServer),
)
//raftOps := kit.ConstructorOpts()
kit.ThroughRPC()
ens := kit.NewEnsemble(t).FullNode(node0, raftOps).FullNode(node1, raftOps).FullNode(node2, raftOps)
ens := kit.NewEnsemble(t).FullNode(node0, raftOps, kit.ThroughRPC()).FullNode(node1, raftOps, kit.ThroughRPC()).FullNode(node2, raftOps, kit.ThroughRPC())
node0.AssignPrivKey(pkey0)
node1.AssignPrivKey(pkey1)
node2.AssignPrivKey(pkey2)
ens.MinerEnroll(miner, node0, kit.WithAllSubsystems())
nodes := []*kit.TestFullNode{node0, node1, node2}
wrappedFullNode := kit.MergeFullNodes(nodes)
ens.MinerEnroll(miner, wrappedFullNode, kit.WithAllSubsystems(), kit.ThroughRPC())
ens.Start()
// Import miner wallet to all nodes
@ -234,3 +239,143 @@ func TestRaftStateLeaderDisconnects(t *testing.T) {
}
}
}
func TestRaftStateMiner(t *testing.T) {
kit.QuietMiningLogs()
ctx := context.Background()
var (
node0 kit.TestFullNode
node1 kit.TestFullNode
node2 kit.TestFullNode
miner kit.TestMiner
)
setup(ctx, t, &node0, &node1, &node2, &miner)
fmt.Println(node0.WalletList(context.Background()))
fmt.Println(node1.WalletList(context.Background()))
fmt.Println(node2.WalletList(context.Background()))
bal, err := node0.WalletBalance(ctx, node0.DefaultKey.Address)
require.NoError(t, err)
msgHalfBal := &types.Message{
From: miner.OwnerKey.Address,
To: node0.DefaultKey.Address,
Value: big.Div(bal, big.NewInt(2)),
}
mu := uuid.New()
smHalfBal, err := miner.FullNode.MpoolPushMessage(ctx, msgHalfBal, &api.MessageSendSpec{
MsgUuid: mu,
})
require.NoError(t, err)
mLookup, err := node0.StateWaitMsg(ctx, smHalfBal.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode)
rstate0 := getRaftState(ctx, t, &node0)
rstate1 := getRaftState(ctx, t, &node1)
rstate2 := getRaftState(ctx, t, &node2)
require.EqualValues(t, rstate0, rstate1)
require.EqualValues(t, rstate0, rstate2)
}
func TestRaftStateLeaderDisconnectsMiner(t *testing.T) {
kit.QuietMiningLogs()
ctx := context.Background()
var (
node0 kit.TestFullNode
node1 kit.TestFullNode
node2 kit.TestFullNode
miner kit.TestMiner
)
nodes := []*kit.TestFullNode{&node0, &node1, &node2}
setup(ctx, t, &node0, &node1, &node2, &miner)
peerToNode := make(map[peer.ID]*kit.TestFullNode)
for _, n := range nodes {
peerToNode[n.Pkey.PeerID] = n
}
bal, err := node0.WalletBalance(ctx, node0.DefaultKey.Address)
require.NoError(t, err)
msgHalfBal := &types.Message{
From: miner.OwnerKey.Address,
To: node0.DefaultKey.Address,
Value: big.Div(bal, big.NewInt(2)),
}
mu := uuid.New()
smHalfBal, err := miner.FullNode.MpoolPushMessage(ctx, msgHalfBal, &api.MessageSendSpec{
MsgUuid: mu,
})
require.NoError(t, err)
mLookup, err := node0.StateWaitMsg(ctx, smHalfBal.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode)
rstate0 := getRaftState(ctx, t, &node0)
rstate1 := getRaftState(ctx, t, &node1)
rstate2 := getRaftState(ctx, t, &node2)
require.True(t, reflect.DeepEqual(rstate0, rstate1))
require.True(t, reflect.DeepEqual(rstate0, rstate2))
leader, err := node1.RaftLeader(ctx)
require.NoError(t, err)
leaderNode := peerToNode[leader]
//
//err = leaderNode.Stop(ctx)
//require.NoError(t, err)
//oldLeaderNode := leaderNode
//
//time.Sleep(5 * time.Second)
//
//newLeader := leader
//for _, n := range nodes {
// if n != leaderNode {
// newLeader, err = n.RaftLeader(ctx)
// require.NoError(t, err)
// require.NotEqual(t, newLeader, leader)
// }
//}
//
//require.NotEqual(t, newLeader, leader)
//leaderNode = peerToNode[newLeader]
err = node0.Stop(ctx)
require.NoError(t, err)
msg2 := &types.Message{
From: miner.OwnerKey.Address,
To: node0.DefaultKey.Address,
Value: big.NewInt(100000),
}
mu2 := uuid.New()
time.Sleep(5 * time.Second)
signedMsg2, err := miner.FullNode.MpoolPushMessage(ctx, msg2, &api.MessageSendSpec{
MaxFee: abi.TokenAmount(config.DefaultDefaultMaxFee),
MsgUuid: mu2,
})
require.NoError(t, err)
mLookup, err = leaderNode.StateWaitMsg(ctx, signedMsg2.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode)
//rstate := getRaftState(ctx, t, leaderNode)
//for _, n := range nodes {
// if n != oldLeaderNode {
// rs := getRaftState(ctx, t, n)
// require.True(t, reflect.DeepEqual(rs, rstate))
// }
//}
}

View File

@ -4,6 +4,7 @@ package consensus
import (
"context"
"encoding/json"
"errors"
"fmt"
"sort"
@ -28,16 +29,69 @@ import (
var logger = logging.Logger("raft")
type NonceMapType map[addr.Address]uint64
type MsgUuidMapType map[uuid.UUID]*types.SignedMessage
type RaftState struct {
NonceMap map[addr.Address]uint64
MsgUuids map[uuid.UUID]*types.SignedMessage
NonceMap NonceMapType
MsgUuids MsgUuidMapType
}
func newRaftState() RaftState {
return RaftState{NonceMap: make(map[addr.Address]uint64),
func newRaftState() *RaftState {
return &RaftState{NonceMap: make(map[addr.Address]uint64),
MsgUuids: make(map[uuid.UUID]*types.SignedMessage)}
}
func (n *NonceMapType) MarshalJSON() ([]byte, error) {
marshalled := make(map[string]uint64)
for a, n := range *n {
marshalled[a.String()] = n
}
return json.Marshal(marshalled)
}
func (n *NonceMapType) UnmarshalJSON(b []byte) error {
unmarshalled := make(map[string]uint64)
err := json.Unmarshal(b, &unmarshalled)
if err != nil {
return err
}
*n = make(map[addr.Address]uint64)
for saddr, nonce := range unmarshalled {
a, err := addr.NewFromString(saddr)
if err != nil {
return err
}
(*n)[a] = nonce
}
return nil
}
func (m *MsgUuidMapType) MarshalJSON() ([]byte, error) {
marshalled := make(map[string]*types.SignedMessage)
for u, msg := range *m {
marshalled[u.String()] = msg
}
return json.Marshal(marshalled)
}
func (m *MsgUuidMapType) UnmarshalJSON(b []byte) error {
unmarshalled := make(map[string]*types.SignedMessage)
err := json.Unmarshal(b, &unmarshalled)
if err != nil {
return err
}
*m = make(map[uuid.UUID]*types.SignedMessage)
for suid, msg := range unmarshalled {
u, err := uuid.Parse(suid)
if err != nil {
return err
}
(*m)[u] = msg
}
return nil
}
type ConsensusOp struct {
Nonce uint64 `codec:"nonce,omitempty"`
Uuid uuid.UUID `codec:"uuid,omitempty"`
@ -46,7 +100,7 @@ type ConsensusOp struct {
}
func (c ConsensusOp) ApplyTo(state consensus.State) (consensus.State, error) {
s := state.(RaftState)
s := state.(*RaftState)
s.NonceMap[c.Addr] = c.Nonce
s.MsgUuids[c.Uuid] = c.SignedMsg
return s, nil
@ -67,7 +121,7 @@ type Consensus struct {
consensus consensus.OpLogConsensus
actor consensus.Actor
raft *raftWrapper
state RaftState
state *RaftState
rpcClient *rpc.Client
rpcReady chan struct{}
@ -138,7 +192,8 @@ func NewConsensusWithRPCClient(staging bool) func(host host.Host,
if err != nil {
return nil, err
}
cc.SetClient(rpcClient)
cc.rpcClient = rpcClient
cc.rpcReady <- struct{}{}
return cc, nil
}
}
@ -245,12 +300,6 @@ func (cc *Consensus) Shutdown(ctx context.Context) error {
return nil
}
// SetClient makes the component ready to perform RPC requets
func (cc *Consensus) SetClient(c *rpc.Client) {
cc.rpcClient = c
cc.rpcReady <- struct{}{}
}
// Ready returns a channel which is signaled when the Consensus
// algorithm has finished bootstrapping and is ready to use
func (cc *Consensus) Ready(ctx context.Context) <-chan struct{} {
@ -451,7 +500,7 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error {
// last agreed-upon RaftState known by this node. No writes are allowed, as all
// writes to the shared state should happen through the Consensus component
// methods.
func (cc *Consensus) State(ctx context.Context) (consensus.State, error) {
func (cc *Consensus) State(ctx context.Context) (*RaftState, error) {
//_, span := trace.StartSpan(ctx, "consensus/RaftState")
//defer span.End()
@ -463,7 +512,7 @@ func (cc *Consensus) State(ctx context.Context) (consensus.State, error) {
if err != nil {
return nil, err
}
state, ok := st.(RaftState)
state, ok := st.(*RaftState)
if !ok {
return nil, errors.New("wrong state type")
}

View File

@ -4,21 +4,10 @@ import (
"context"
consensus "github.com/libp2p/go-libp2p-consensus"
rpc "github.com/libp2p/go-libp2p-gorpc"
"github.com/libp2p/go-libp2p/core/peer"
)
// Component represents a piece of ipfscluster. Cluster components
// usually run their own goroutines (a http server for example). They
// communicate with the main Cluster component and other components
// (both local and remote), using an instance of rpc.Client.
type Component interface {
SetClient(*rpc.Client)
Shutdown(context.Context) error
}
type ConsensusAPI interface {
Component
// Returns a channel to signal that the consensus layer is ready
// allowing the main component to wait for it during start.
Ready(context.Context) <-chan struct{}
@ -47,4 +36,6 @@ type ConsensusAPI interface {
Distrust(context.Context, peer.ID) error
// Returns true if current node is the cluster leader
IsLeader(ctx context.Context) bool
Shutdown(context.Context) error
}

View File

@ -5,11 +5,11 @@ import (
"time"
logging "github.com/ipfs/go-log/v2"
consensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
consensus2 "github.com/filecoin-project/lotus/lib/consensus/raft"
"github.com/filecoin-project/lotus/node/impl/client"
"github.com/filecoin-project/lotus/node/impl/common"
"github.com/filecoin-project/lotus/node/impl/full"
@ -35,6 +35,7 @@ type FullNodeAPI struct {
full.MsigAPI
full.WalletAPI
full.SyncAPI
full.RaftAPI
DS dtypes.MetadataDS
NetworkName dtypes.NetworkName
@ -118,8 +119,12 @@ func (n *FullNodeAPI) NodeStatus(ctx context.Context, inclChainStatus bool) (sta
return status, nil
}
func (n *FullNodeAPI) RaftState(ctx context.Context) (consensus.State, error) {
return n.MpoolAPI.GetRaftState(ctx)
func (n *FullNodeAPI) RaftState(ctx context.Context) (*consensus2.RaftState, error) {
return n.RaftAPI.GetRaftState(ctx)
}
func (n *FullNodeAPI) RaftLeader(ctx context.Context) (peer.ID, error) {
return n.RaftAPI.Leader(ctx)
}
var _ api.FullNode = &FullNodeAPI{}

View File

@ -5,8 +5,6 @@ import (
"encoding/json"
"github.com/ipfs/go-cid"
consensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/fx"
"golang.org/x/xerrors"
@ -46,6 +44,8 @@ type MpoolAPI struct {
GasAPI
RaftAPI
MessageSigner messagesigner.MsgSigner
// MessageSigner *messagesigner.MessageSigner
@ -147,9 +147,9 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe
inMsg := *msg
// Redirect to leader if current node is not leader. A single non raft based node is always the leader
if !a.MessageSigner.IsLeader(ctx) {
if !a.RaftAPI.IsLeader(ctx) {
var signedMsg types.SignedMessage
redirected, err := a.MessageSigner.RedirectToLeader(ctx, "MpoolPushMessage", api.MpoolMessageWhole{Msg: msg, Spec: spec}, &signedMsg)
redirected, err := a.RaftAPI.RedirectToLeader(ctx, "MpoolPushMessage", api.MpoolMessageWhole{Msg: msg, Spec: spec}, &signedMsg)
if err != nil {
return nil, err
}
@ -290,10 +290,11 @@ func (a *MpoolAPI) MpoolSub(ctx context.Context) (<-chan api.MpoolUpdate, error)
return a.Mpool.Updates(ctx)
}
func (a *MpoolAPI) GetRaftState(ctx context.Context) (consensus.State, error) {
return a.MessageSigner.GetRaftState(ctx)
}
func (a *MpoolAPI) RaftLeader(ctx context.Context) (peer.ID, error) {
return a.MessageSigner.Leader(ctx)
}
//func (a *MpoolAPI) GetRaftState(ctx context.Context) (consensus.RaftState, error) {
// state, err := a.MessageSigner.GetRaftState(ctx)
// raftState := state.()
//}
//
//func (a *MpoolAPI) RaftLeader(ctx context.Context) (peer.ID, error) {
// return a.MessageSigner.Leader(ctx)
//}

46
node/impl/full/raft.go Normal file
View File

@ -0,0 +1,46 @@
package full
import (
"context"
"github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/messagesigner"
consensus "github.com/filecoin-project/lotus/lib/consensus/raft"
)
type RaftAPI struct {
fx.In
MessageSigner *messagesigner.MessageSignerConsensus `optional:"true"`
}
func (r *RaftAPI) GetRaftState(ctx context.Context) (*consensus.RaftState, error) {
if r.MessageSigner == nil {
return nil, xerrors.Errorf("Raft consensus not enabled. Please check your configuration")
}
return r.MessageSigner.GetRaftState(ctx)
}
func (r *RaftAPI) Leader(ctx context.Context) (peer.ID, error) {
if r.MessageSigner == nil {
return "", xerrors.Errorf("Raft consensus not enabled. Please check your configuration")
}
return r.MessageSigner.Leader(ctx)
}
func (r *RaftAPI) IsLeader(ctx context.Context) bool {
if r.MessageSigner == nil {
return true
}
return r.MessageSigner.IsLeader(ctx)
}
func (r *RaftAPI) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) {
if r.MessageSigner == nil {
return false, xerrors.Errorf("Raft consensus not enabled. Please check your configuration")
}
return r.MessageSigner.RedirectToLeader(ctx, method, arg, ret)
}