diff --git a/api/api_full.go b/api/api_full.go index ce463f82f..91f290170 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -8,7 +8,6 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - consensus "github.com/libp2p/go-libp2p-consensus" "github.com/libp2p/go-libp2p/core/peer" "github.com/filecoin-project/go-address" @@ -30,6 +29,7 @@ import ( lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/power" "github.com/filecoin-project/lotus/chain/types" + consensus2 "github.com/filecoin-project/lotus/lib/consensus/raft" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo/imports" ) @@ -753,8 +753,8 @@ type FullNode interface { // the path specified when calling CreateBackup is within the base path CreateBackup(ctx context.Context, fpath string) error //perm:admin - RaftState(ctx context.Context) (consensus.State, error) //perm:read - RaftLeader(ctx context.Context) (peer.ID, error) //perm:read + RaftState(ctx context.Context) (*consensus2.RaftState, error) //perm:read + RaftLeader(ctx context.Context) (peer.ID, error) //perm:read } type StorageAsk struct { diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 40b6e6078..46b2bce1e 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -39,6 +39,7 @@ import ( "github.com/filecoin-project/lotus/api/v0api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" + consensus "github.com/filecoin-project/lotus/lib/consensus/raft" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo/imports" sealing "github.com/filecoin-project/lotus/storage/pipeline" @@ -339,6 +340,10 @@ func init() { addExample(map[string]bitfield.BitField{ "": bitfield.NewFromSet([]uint64{5, 6, 7, 10}), }) + addExample(&consensus.RaftState{ + NonceMap: make(map[address.Address]uint64), + MsgUuids: make(map[uuid.UUID]*types.SignedMessage), + }) } diff --git a/api/mocks/mock_full.go b/api/mocks/mock_full.go index f5b023207..583cbd67f 100644 --- a/api/mocks/mock_full.go +++ b/api/mocks/mock_full.go @@ -14,7 +14,6 @@ import ( uuid "github.com/google/uuid" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" - consensus "github.com/libp2p/go-libp2p-consensus" metrics "github.com/libp2p/go-libp2p/core/metrics" network0 "github.com/libp2p/go-libp2p/core/network" peer "github.com/libp2p/go-libp2p/core/peer" @@ -38,6 +37,7 @@ import ( miner0 "github.com/filecoin-project/lotus/chain/actors/builtin/miner" types "github.com/filecoin-project/lotus/chain/types" alerting "github.com/filecoin-project/lotus/journal/alerting" + consensus "github.com/filecoin-project/lotus/lib/consensus/raft" dtypes "github.com/filecoin-project/lotus/node/modules/dtypes" imports "github.com/filecoin-project/lotus/node/repo/imports" ) @@ -2260,10 +2260,10 @@ func (mr *MockFullNodeMockRecorder) RaftLeader(arg0 interface{}) *gomock.Call { } // RaftState mocks base method. -func (m *MockFullNode) RaftState(arg0 context.Context) (consensus.State, error) { +func (m *MockFullNode) RaftState(arg0 context.Context) (*consensus.RaftState, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RaftState", arg0) - ret0, _ := ret[0].(consensus.State) + ret0, _ := ret[0].(*consensus.RaftState) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/api/proxy_gen.go b/api/proxy_gen.go index ff1e560c4..af3a8b515 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -10,7 +10,6 @@ import ( "github.com/google/uuid" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - consensus "github.com/libp2p/go-libp2p-consensus" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -37,6 +36,7 @@ import ( lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/journal/alerting" + consensus2 "github.com/filecoin-project/lotus/lib/consensus/raft" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo/imports" "github.com/filecoin-project/lotus/storage/pipeline/sealiface" @@ -342,7 +342,7 @@ type FullNodeStruct struct { RaftLeader func(p0 context.Context) (peer.ID, error) `perm:"read"` - RaftState func(p0 context.Context) (consensus.State, error) `perm:"read"` + RaftState func(p0 context.Context) (*consensus2.RaftState, error) `perm:"read"` StateAccountKey func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) `perm:"read"` @@ -2457,15 +2457,15 @@ func (s *FullNodeStub) RaftLeader(p0 context.Context) (peer.ID, error) { return *new(peer.ID), ErrNotSupported } -func (s *FullNodeStruct) RaftState(p0 context.Context) (consensus.State, error) { +func (s *FullNodeStruct) RaftState(p0 context.Context) (*consensus2.RaftState, error) { if s.Internal.RaftState == nil { - return *new(consensus.State), ErrNotSupported + return nil, ErrNotSupported } return s.Internal.RaftState(p0) } -func (s *FullNodeStub) RaftState(p0 context.Context) (consensus.State, error) { - return *new(consensus.State), ErrNotSupported +func (s *FullNodeStub) RaftState(p0 context.Context) (*consensus2.RaftState, error) { + return nil, ErrNotSupported } func (s *FullNodeStruct) StateAccountKey(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) { diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 2216429e1..26c2f5e95 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/chain/messagesigner/messagesigner.go b/chain/messagesigner/messagesigner.go index 433f77dc3..67abf62d8 100644 --- a/chain/messagesigner/messagesigner.go +++ b/chain/messagesigner/messagesigner.go @@ -9,8 +9,6 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log/v2" - consensus "github.com/libp2p/go-libp2p-consensus" - "github.com/libp2p/go-libp2p/core/peer" cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" @@ -38,10 +36,10 @@ type MsgSigner interface { NextNonce(ctx context.Context, addr address.Address) (uint64, error) SaveNonce(ctx context.Context, addr address.Address, nonce uint64) error dstoreKey(addr address.Address) datastore.Key - IsLeader(ctx context.Context) bool - Leader(ctx context.Context) (peer.ID, error) - RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) - GetRaftState(ctx context.Context) (consensus.State, error) + //IsLeader(ctx context.Context) bool + //Leader(ctx context.Context) (peer.ID, error) + //RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) + //GetRaftState(ctx context.Context) (consensus.State, error) } // MessageSigner keeps track of nonces per address, and increments the nonce @@ -200,18 +198,18 @@ func (ms *MessageSigner) dstoreKey(addr address.Address) datastore.Key { return datastore.KeyWithNamespaces([]string{dsKeyActorNonce, addr.String()}) } -func (ms *MessageSigner) IsLeader(ctx context.Context) bool { - return true -} - -func (ms *MessageSigner) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) { - return false, xerrors.Errorf("single node shouldn't have any redirects") -} - -func (ms *MessageSigner) GetRaftState(ctx context.Context) (consensus.State, error) { - return nil, xerrors.Errorf("this is a non raft consensus message signer") -} - -func (ms *MessageSigner) Leader(ctx context.Context) (peer.ID, error) { - return "", xerrors.Errorf("no leaders in non raft message signer") -} +//func (ms *MessageSigner) IsLeader(ctx context.Context) bool { +// return true +//} +// +//func (ms *MessageSigner) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) { +// return false, xerrors.Errorf("single node shouldn't have any redirects") +//} +// +//func (ms *MessageSigner) GetRaftState(ctx context.Context) (consensus.State, error) { +// return nil, xerrors.Errorf("this is a non raft consensus message signer") +//} +// +//func (ms *MessageSigner) Leader(ctx context.Context) (peer.ID, error) { +// return "", xerrors.Errorf("no leaders in non raft message signer") +//} diff --git a/chain/messagesigner/messagesigner_consensus.go b/chain/messagesigner/messagesigner_consensus.go index 3d7cba7ac..8573ecd3d 100644 --- a/chain/messagesigner/messagesigner_consensus.go +++ b/chain/messagesigner/messagesigner_consensus.go @@ -6,7 +6,6 @@ import ( "github.com/google/uuid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" - libp2pconsensus "github.com/libp2p/go-libp2p-consensus" "github.com/libp2p/go-libp2p/core/peer" "golang.org/x/xerrors" @@ -77,12 +76,12 @@ func (ms *MessageSignerConsensus) SignMessage( } func (ms *MessageSignerConsensus) GetSignedMessage(ctx context.Context, uuid uuid.UUID) (*types.SignedMessage, error) { - state, err := ms.consensus.State(ctx) + cstate, err := ms.consensus.State(ctx) if err != nil { return nil, err } - cstate := state.(consensus.RaftState) + //cstate := state.(consensus.RaftState) msg, ok := cstate.MsgUuids[uuid] if !ok { return nil, xerrors.Errorf("Msg with Uuid %s not available", uuid) @@ -90,7 +89,7 @@ func (ms *MessageSignerConsensus) GetSignedMessage(ctx context.Context, uuid uui return msg, nil } -func (ms *MessageSignerConsensus) GetRaftState(ctx context.Context) (libp2pconsensus.State, error) { +func (ms *MessageSignerConsensus) GetRaftState(ctx context.Context) (*consensus.RaftState, error) { return ms.consensus.State(ctx) } diff --git a/cli/util/api.go b/cli/util/api.go index 6c673d91f..c34e4ebe0 100644 --- a/cli/util/api.go +++ b/cli/util/api.go @@ -8,6 +8,7 @@ import ( "net/url" "os" "os/signal" + "reflect" "strings" "syscall" @@ -36,7 +37,7 @@ const ( // 2. *_API_INFO environment variables // 3. deprecated *_API_INFO environment variables // 4. *-repo command line flags. -func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { +func GetAPIInfoMulti(ctx *cli.Context, t repo.RepoType) ([]APIInfo, error) { // Check if there was a flag passed with the listen address of the API // server (only used by the tests) for _, f := range t.APIFlags() { @@ -46,7 +47,7 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { strma := ctx.String(f) strma = strings.TrimSpace(strma) - return APIInfo{Addr: strma}, nil + return []APIInfo{APIInfo{Addr: strma}}, nil } // @@ -56,14 +57,14 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { primaryEnv, fallbacksEnvs, deprecatedEnvs := t.APIInfoEnvVars() env, ok := os.LookupEnv(primaryEnv) if ok { - return ParseApiInfo(env), nil + return ParseApiInfoMulti(env), nil } for _, env := range deprecatedEnvs { env, ok := os.LookupEnv(env) if ok { log.Warnf("Using deprecated env(%s) value, please use env(%s) instead.", env, primaryEnv) - return ParseApiInfo(env), nil + return ParseApiInfoMulti(env), nil } } @@ -76,26 +77,26 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { p, err := homedir.Expand(path) if err != nil { - return APIInfo{}, xerrors.Errorf("could not expand home dir (%s): %w", f, err) + return []APIInfo{}, xerrors.Errorf("could not expand home dir (%s): %w", f, err) } r, err := repo.NewFS(p) if err != nil { - return APIInfo{}, xerrors.Errorf("could not open repo at path: %s; %w", p, err) + return []APIInfo{}, xerrors.Errorf("could not open repo at path: %s; %w", p, err) } exists, err := r.Exists() if err != nil { - return APIInfo{}, xerrors.Errorf("repo.Exists returned an error: %w", err) + return []APIInfo{}, xerrors.Errorf("repo.Exists returned an error: %w", err) } if !exists { - return APIInfo{}, errors.New("repo directory does not exist. Make sure your configuration is correct") + return []APIInfo{}, errors.New("repo directory does not exist. Make sure your configuration is correct") } ma, err := r.APIEndpoint() if err != nil { - return APIInfo{}, xerrors.Errorf("could not get api endpoint: %w", err) + return []APIInfo{}, xerrors.Errorf("could not get api endpoint: %w", err) } token, err := r.APIToken() @@ -103,38 +104,80 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { log.Warnf("Couldn't load CLI token, capabilities may be limited: %v", err) } - return APIInfo{ + return []APIInfo{APIInfo{ Addr: ma.String(), Token: token, - }, nil + }}, nil } for _, env := range fallbacksEnvs { env, ok := os.LookupEnv(env) if ok { - return ParseApiInfo(env), nil + return ParseApiInfoMulti(env), nil } } - return APIInfo{}, fmt.Errorf("could not determine API endpoint for node type: %v", t.Type()) + return []APIInfo{}, fmt.Errorf("could not determine API endpoint for node type: %v", t.Type()) +} + +func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { + ainfos, err := GetAPIInfoMulti(ctx, t) + if err != nil { + return APIInfo{}, err + } + + if len(ainfos) > 1 { + log.Warn("multiple API infos received when only one was expected") + } + + return ainfos[0], nil + +} + +type HttpHead struct { + addr string + header http.Header +} + +func GetRawAPIMulti(ctx *cli.Context, t repo.RepoType, version string) ([]HttpHead, error) { + + var httpHeads []HttpHead + ainfos, err := GetAPIInfoMulti(ctx, t) + if err != nil { + return httpHeads, xerrors.Errorf("could not get API info for %s: %w", t.Type(), err) + } + + for _, ainfo := range ainfos { + addr, err := ainfo.DialArgs(version) + if err != nil { + return httpHeads, xerrors.Errorf("could not get DialArgs: %w", err) + } + httpHeads = append(httpHeads, HttpHead{addr: addr, header: ainfo.AuthHeader()}) + } + + //addr, err := ainfo.DialArgs(version) + //if err != nil { + // return "", nil, xerrors.Errorf("could not get DialArgs: %w", err) + //} + + if IsVeryVerbose { + _, _ = fmt.Fprintf(ctx.App.Writer, "using raw API %s endpoint: %s\n", version, httpHeads[0].addr) + } + + return httpHeads, nil } func GetRawAPI(ctx *cli.Context, t repo.RepoType, version string) (string, http.Header, error) { - ainfo, err := GetAPIInfo(ctx, t) + heads, err := GetRawAPIMulti(ctx, t, version) if err != nil { - return "", nil, xerrors.Errorf("could not get API info for %s: %w", t.Type(), err) + return "", nil, err } - addr, err := ainfo.DialArgs(version) - if err != nil { - return "", nil, xerrors.Errorf("could not get DialArgs: %w", err) + if len(heads) > 1 { + log.Warnf("More than 1 header received when expecting only one") } - if IsVeryVerbose { - _, _ = fmt.Fprintf(ctx.App.Writer, "using raw API %s endpoint: %s\n", version, addr) - } - - return addr, ainfo.AuthHeader(), nil + return heads[0].addr, heads[0].header, nil } func GetCommonAPI(ctx *cli.Context) (api.CommonNet, jsonrpc.ClientCloser, error) { @@ -185,6 +228,76 @@ func GetFullNodeAPI(ctx *cli.Context) (v0api.FullNode, jsonrpc.ClientCloser, err return client.NewFullNodeRPCV0(ctx.Context, addr, headers) } +func RouteRequest() { + +} + +func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) { + outs := api.GetInternalStructs(outstr) + + var rins []reflect.Value + //peertoNode := make(map[peer.ID]reflect.Value) + for _, in := range ins { + rin := reflect.ValueOf(in) + rins = append(rins, rin) + //peertoNode[ins] = rin + } + + for _, out := range outs { + rint := reflect.ValueOf(out).Elem() + //ra := reflect.ValueOf(in) + + for f := 0; f < rint.NumField(); f++ { + field := rint.Type().Field(f) + + var fns []reflect.Value + for _, rin := range rins { + fns = append(fns, rin.MethodByName(field.Name)) + } + //fn := ra.MethodByName(field.Name) + + //curr := 0 + //total := len(rins) + + //retryFunc := func(args []reflect.Value) (results []reflect.Value) { + // //ctx := args[0].Interface().(context.Context) + // // + // //rin := peertoNode[ins[0].Leader(ctx)] + // //fn := rin.MethodByName(field.Name) + // // + // //return fn.Call(args) + // + // toCall := curr + // curr += 1 % total + // return fns[toCall].Call(args) + //} + + rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) { + //errorsToRetry := []error{&jsonrpc.RPCConnectionError{}} + //initialBackoff, err := time.ParseDuration("1s") + //if err != nil { + // return nil + //} + //result, err := retry.Retry(5, initialBackoff, errorsToRetry, func() (results []reflect.Value, err2 error) { + // //ctx := args[0].Interface().(context.Context) + // // + // //rin := peertoNode[ins[0].Leader(ctx)] + // //fn := rin.MethodByName(field.Name) + // // + // //return fn.Call(args) + // + // toCall := curr + // curr += 1 % total + // result := fns[toCall].Call(args) + // return result, results[len(results)-1].Interface().(error) + //}) + //return result + return fns[0].Call(args) + })) + } + } +} + func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) { if tn, ok := ctx.App.Metadata["testnode-full"]; ok { return tn.(v1api.FullNode), func() {}, nil @@ -214,6 +327,51 @@ func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, e return v1API, closer, nil } +func GetFullNodeAPIV1New(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) { + if tn, ok := ctx.App.Metadata["testnode-full"]; ok { + return tn.(v1api.FullNode), func() {}, nil + } + + heads, err := GetRawAPIMulti(ctx, repo.FullNode, "v1") + if err != nil { + return nil, nil, err + } + + if IsVeryVerbose { + _, _ = fmt.Fprintln(ctx.App.Writer, "using full node API v1 endpoint:", heads[0].addr) + } + + var fullNodes []api.FullNode + var closers []jsonrpc.ClientCloser + + for _, head := range heads { + v1api, closer, err := client.NewFullNodeRPCV1(ctx.Context, head.addr, head.header) + if err != nil { + return nil, nil, err + } + fullNodes = append(fullNodes, v1api) + closers = append(closers, closer) + } + + finalCloser := func() { + for _, c := range closers { + c() + } + } + + var v1API api.FullNodeStruct + FullNodeProxy(fullNodes, &v1API) + + v, err := v1API.Version(ctx.Context) + if err != nil { + return nil, nil, err + } + if !v.APIVersion.EqMajorMinor(api.FullAPIVersion1) { + return nil, nil, xerrors.Errorf("Remote API version didn't match (expected %s, remote %s)", api.FullAPIVersion1, v.APIVersion) + } + return &v1API, finalCloser, nil +} + type GetStorageMinerOptions struct { PreferHttp bool } diff --git a/cli/util/apiinfo.go b/cli/util/apiinfo.go index 41ca18c61..14e27518f 100644 --- a/cli/util/apiinfo.go +++ b/cli/util/apiinfo.go @@ -24,6 +24,7 @@ type APIInfo struct { func ParseApiInfo(s string) APIInfo { var tok []byte + if infoWithToken.Match([]byte(s)) { sp := strings.SplitN(s, ":", 2) tok = []byte(sp[0]) @@ -36,6 +37,31 @@ func ParseApiInfo(s string) APIInfo { } } +func ParseApiInfoMulti(s string) []APIInfo { + var apiInfos []APIInfo + + if infoWithToken.Match([]byte(s)) { + + allAddrs := strings.SplitN(s, ",", -1) + + for _, addr := range allAddrs { + sp := strings.SplitN(addr, ":", 2) + //tok = []byte(sp[0]) + //s = sp[1] + apiInfos = append(apiInfos, APIInfo{ + Addr: sp[1], + Token: []byte(sp[0]), + }) + } + } + + //return APIInfo{ + // Addr: s, + // Token: tok, + //} + return apiInfos +} + func (a APIInfo) DialArgs(version string) (string, error) { ma, err := multiaddr.NewMultiaddr(a.Addr) if err == nil { diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 86a042581..bf4fd0252 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -5069,7 +5069,13 @@ Perms: read Inputs: `null` -Response: `{}` +Response: +```json +{ + "NonceMap": {}, + "MsgUuids": {} +} +``` ## State The State methods are used to query, inspect, and interact with chain state. diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index 27162a6ae..e69de29bb 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -1,228 +0,0 @@ -[API] - # Binding address for the Lotus API - # - # type: string - # env var: LOTUS_API_LISTENADDRESS - #ListenAddress = "/ip4/127.0.0.1/tcp/1234/http" - - # type: string - # env var: LOTUS_API_REMOTELISTENADDRESS - #RemoteListenAddress = "" - - # type: Duration - # env var: LOTUS_API_TIMEOUT - #Timeout = "30s" - - -[Backup] - # When set to true disables metadata log (.lotus/kvlog). This can save disk - # space by reducing metadata redundancy. - # - # Note that in case of metadata corruption it might be much harder to recover - # your node if metadata log is disabled - # - # type: bool - # env var: LOTUS_BACKUP_DISABLEMETADATALOG - #DisableMetadataLog = true - - -[Logging] - [Logging.SubsystemLevels] - # env var: LOTUS_LOGGING_SUBSYSTEMLEVELS_EXAMPLE-SUBSYSTEM - #example-subsystem = "INFO" - - -[Libp2p] - # Binding address for the libp2p host - 0 means random port. - # Format: multiaddress; see https://multiformats.io/multiaddr/ - # - # type: []string - # env var: LOTUS_LIBP2P_LISTENADDRESSES - #ListenAddresses = ["/ip4/0.0.0.0/tcp/0", "/ip6/::/tcp/0"] - - # Addresses to explicitally announce to other peers. If not specified, - # all interface addresses are announced - # Format: multiaddress - # - # type: []string - # env var: LOTUS_LIBP2P_ANNOUNCEADDRESSES - #AnnounceAddresses = [] - - # Addresses to not announce - # Format: multiaddress - # - # type: []string - # env var: LOTUS_LIBP2P_NOANNOUNCEADDRESSES - #NoAnnounceAddresses = [] - - # When not disabled (default), lotus asks NAT devices (e.g., routers), to - # open up an external port and forward it to the port lotus is running on. - # When this works (i.e., when your router supports NAT port forwarding), - # it makes the local lotus node accessible from the public internet - # - # type: bool - # env var: LOTUS_LIBP2P_DISABLENATPORTMAP - #DisableNatPortMap = false - - # ConnMgrLow is the number of connections that the basic connection manager - # will trim down to. - # - # type: uint - # env var: LOTUS_LIBP2P_CONNMGRLOW - #ConnMgrLow = 150 - - # ConnMgrHigh is the number of connections that, when exceeded, will trigger - # a connection GC operation. Note: protected/recently formed connections don't - # count towards this limit. - # - # type: uint - # env var: LOTUS_LIBP2P_CONNMGRHIGH - #ConnMgrHigh = 180 - - # ConnMgrGrace is a time duration that new connections are immune from being - # closed by the connection manager. - # - # type: Duration - # env var: LOTUS_LIBP2P_CONNMGRGRACE - #ConnMgrGrace = "20s" - - -[Pubsub] - # Run the node in bootstrap-node mode - # - # type: bool - # env var: LOTUS_PUBSUB_BOOTSTRAPPER - #Bootstrapper = false - - # type: string - # env var: LOTUS_PUBSUB_REMOTETRACER - #RemoteTracer = "" - - -[Client] - # type: bool - # env var: LOTUS_CLIENT_USEIPFS - #UseIpfs = false - - # type: bool - # env var: LOTUS_CLIENT_IPFSONLINEMODE - #IpfsOnlineMode = false - - # type: string - # env var: LOTUS_CLIENT_IPFSMADDR - #IpfsMAddr = "" - - # type: bool - # env var: LOTUS_CLIENT_IPFSUSEFORRETRIEVAL - #IpfsUseForRetrieval = false - - # The maximum number of simultaneous data transfers between the client - # and storage providers for storage deals - # - # type: uint64 - # env var: LOTUS_CLIENT_SIMULTANEOUSTRANSFERSFORSTORAGE - #SimultaneousTransfersForStorage = 20 - - # The maximum number of simultaneous data transfers between the client - # and storage providers for retrieval deals - # - # type: uint64 - # env var: LOTUS_CLIENT_SIMULTANEOUSTRANSFERSFORRETRIEVAL - #SimultaneousTransfersForRetrieval = 20 - - # Require that retrievals perform no on-chain operations. Paid retrievals - # without existing payment channels with available funds will fail instead - # of automatically performing on-chain operations. - # - # type: bool - # env var: LOTUS_CLIENT_OFFCHAINRETRIEVAL - #OffChainRetrieval = false - - -[Wallet] - # type: string - # env var: LOTUS_WALLET_REMOTEBACKEND - #RemoteBackend = "" - - # type: bool - # env var: LOTUS_WALLET_ENABLELEDGER - #EnableLedger = false - - # type: bool - # env var: LOTUS_WALLET_DISABLELOCAL - #DisableLocal = false - - -[Fees] - # type: types.FIL - # env var: LOTUS_FEES_DEFAULTMAXFEE - #DefaultMaxFee = "0.07 FIL" - - -[Chainstore] - # type: bool - # env var: LOTUS_CHAINSTORE_ENABLESPLITSTORE - #EnableSplitstore = false - - [Chainstore.Splitstore] - # ColdStoreType specifies the type of the coldstore. - # It can be "universal" (default) or "discard" for discarding cold blocks. - # - # type: string - # env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTORETYPE - #ColdStoreType = "universal" - - # HotStoreType specifies the type of the hotstore. - # Only currently supported value is "badger". - # - # type: string - # env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTORETYPE - #HotStoreType = "badger" - - # MarkSetType specifies the type of the markset. - # It can be "map" for in memory marking or "badger" (default) for on-disk marking. - # - # type: string - # env var: LOTUS_CHAINSTORE_SPLITSTORE_MARKSETTYPE - #MarkSetType = "badger" - - # HotStoreMessageRetention specifies the retention policy for messages, in finalities beyond - # the compaction boundary; default is 0. - # - # type: uint64 - # env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREMESSAGERETENTION - #HotStoreMessageRetention = 0 - - # HotStoreFullGCFrequency specifies how often to perform a full (moving) GC on the hotstore. - # A value of 0 disables, while a value 1 will do full GC in every compaction. - # Default is 20 (about once a week). - # - # type: uint64 - # env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREFULLGCFREQUENCY - #HotStoreFullGCFrequency = 20 - - # EnableColdStoreAutoPrune turns on compaction of the cold store i.e. pruning - # where hotstore compaction occurs every finality epochs pruning happens every 3 finalities - # Default is false - # - # type: bool - # env var: LOTUS_CHAINSTORE_SPLITSTORE_ENABLECOLDSTOREAUTOPRUNE - #EnableColdStoreAutoPrune = false - - # ColdStoreFullGCFrequency specifies how often to performa a full (moving) GC on the coldstore. - # Only applies if auto prune is enabled. A value of 0 disables while a value of 1 will do - # full GC in every prune. - # Default is 7 (about once every a week) - # - # type: uint64 - # env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTOREFULLGCFREQUENCY - #ColdStoreFullGCFrequency = 7 - - # ColdStoreRetention specifies the retention policy for data reachable from the chain, in - # finalities beyond the compaction boundary, default is 0, -1 retains everything - # - # type: int64 - # env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTORERETENTION - #ColdStoreRetention = 0 - - diff --git a/itests/gateway_test.go b/itests/gateway_test.go index a7d9d353b..b9c861bf3 100644 --- a/itests/gateway_test.go +++ b/itests/gateway_test.go @@ -297,7 +297,7 @@ func startNodes( l, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - srv, _ := kit.CreateRPCServer(t, handler, l) + srv, _, _ := kit.CreateRPCServer(t, handler, l) // Create a gateway client API that connects to the gateway server var gapi api.Gateway diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 78d10423d..34cf1eb7e 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "net" "net/http" + "reflect" "sync" "testing" "time" @@ -81,7 +82,7 @@ func init() { // // var full TestFullNode // var miner TestMiner -// ens.FullNode(&full, opts...) // populates a full node +// ens.FullNodes(&full, opts...) // populates a full node // ens.Miner(&miner, &full, opts...) // populates a miner, using the full node as its chain daemon // // It is possible to pass functional options to set initial balances, @@ -101,7 +102,7 @@ func init() { // // The API is chainable, so it's possible to do a lot in a very succinct way: // -// kit.NewEnsemble().FullNode(&full).Miner(&miner, &full).Start().InterconnectAll().BeginMining() +// kit.NewEnsemble().FullNodes(&full).Miner(&miner, &full).Start().InterconnectAll().BeginMining() // // You can also find convenient fullnode:miner presets, such as 1:1, 1:2, // and 2:1, e.g.: @@ -318,6 +319,12 @@ func (n *Ensemble) Miner(minerNode *TestMiner, full *TestFullNode, opts ...NodeO return n } +//func (n *Ensemble) MinerWithMultipleNodes(minerNode *TestMiner, full []*TestFullNode, opts ...NodeOpt) *Ensemble { +// n.MinerEnroll(minerNode, full, opts...) +// n.AddInactiveMiner(minerNode) +// return n +//} + // Worker enrolls a new worker, using the provided full node for chain // interactions. func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...NodeOpt) *Ensemble { @@ -344,6 +351,43 @@ func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...Node return n } +func proxy(ins []api.FullNode, outstr *api.FullNodeStruct) { + outs := api.GetInternalStructs(outstr) + + var rins []reflect.Value + //peertoNode := make(map[peer.ID]reflect.Value) + for _, in := range ins { + rin := reflect.ValueOf(in) + rins = append(rins, rin) + //peertoNode[ins] = rin + } + + for _, out := range outs { + rint := reflect.ValueOf(out).Elem() + //ra := reflect.ValueOf(in) + + for f := 0; f < rint.NumField(); f++ { + field := rint.Type().Field(f) + + var fns []reflect.Value + for _, rin := range rins { + fns = append(fns, rin.MethodByName(field.Name)) + } + //fn := ra.MethodByName(field.Name) + + rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) { + //ctx := args[0].Interface().(context.Context) + // + //rin := peertoNode[ins[0].Leader(ctx)] + //fn := rin.MethodByName(field.Name) + // + //return fn.Call(args) + return fns[0].Call(args) + })) + } + } +} + // Start starts all enrolled nodes. func (n *Ensemble) Start() *Ensemble { ctx := context.Background() @@ -463,10 +507,17 @@ func (n *Ensemble) Start() *Ensemble { err = full.WalletSetDefault(context.Background(), addr) require.NoError(n.t, err) + var rpcShutdownOnce sync.Once // Are we hitting this node through its RPC? if full.options.rpc { - withRPC := fullRpc(n.t, full) + withRPC, rpcCloser := fullRpc(n.t, full) n.inactive.fullnodes[i] = withRPC + full.Stop = func(ctx2 context.Context) error { + //rpcCloser() + rpcShutdownOnce.Do(rpcCloser) + return stop(ctx) + } + n.t.Cleanup(func() { rpcShutdownOnce.Do(rpcCloser) }) } n.t.Cleanup(func() { @@ -563,7 +614,7 @@ func (n *Ensemble) Start() *Ensemble { } // // Set it as the default address. - // err = m.FullNode.WalletSetDefault(ctx, m.OwnerAddr.Address) + // err = m.FullNodes.WalletSetDefault(ctx, m.OwnerAddr.Address) // require.NoError(n.t, err) r := repo.NewMemory(nil) @@ -663,7 +714,21 @@ func (n *Ensemble) Start() *Ensemble { assigner := m.options.minerAssigner disallowRemoteFinalize := m.options.disallowRemoteFinalize + //var wrappedFullNode api.FullNodeStruct + //var fullNodes []api.FullNode + //for _, fn := range m.FullNodes { + // fullNodes = append(fullNodes, fn.FullNode) + //} + //proxy(fullNodes, &wrappedFullNode) + var mineBlock = make(chan lotusminer.MineReq) + + copy := *m.FullNode + copy.FullNode = modules.MakeUuidWrapper(copy.FullNode) + m.FullNode = © + + //m.FullNode.FullNode = modules.MakeUuidWrapper(fn.FullNode) + opts := []node.Option{ node.StorageMiner(&m.StorageMiner, cfg.Subsystems), node.Base(), @@ -671,7 +736,9 @@ func (n *Ensemble) Start() *Ensemble { node.Test(), node.If(m.options.disableLibp2p, node.MockHost(n.mn)), - node.Override(new(v1api.RawFullNodeAPI), m.FullNode.FullNode), + //node.Override(new(v1api.RawFullNodeAPI), func() api.FullNode { return modules.MakeUuidWrapper(m.FullNode) }), + //node.Override(new(v1api.RawFullNodeAPI), modules.MakeUuidWrapper), + node.Override(new(v1api.RawFullNodeAPI), m.FullNode), node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, m.ActorAddr)), // disable resource filtering so that local worker gets assigned tasks diff --git a/itests/kit/node_full.go b/itests/kit/node_full.go index 60d12a9a3..2ed9b89e5 100644 --- a/itests/kit/node_full.go +++ b/itests/kit/node_full.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet/key" + cliutil "github.com/filecoin-project/lotus/cli/util" "github.com/filecoin-project/lotus/node" ) @@ -44,6 +45,27 @@ type TestFullNode struct { options nodeOpts } +func MergeFullNodes(fullNodes []*TestFullNode) *TestFullNode { + var wrappedFullNode TestFullNode + //var fnapis []api.FullNode + //for _, fullNode := range fullNodes { + // fnapis = append(fnapis, fullNode) + //} + + var fns api.FullNodeStruct + wrappedFullNode.FullNode = &fns + + cliutil.FullNodeProxy(fullNodes, &fns) + + wrappedFullNode.t = fullNodes[0].t + wrappedFullNode.ListenAddr = fullNodes[0].ListenAddr + wrappedFullNode.DefaultKey = fullNodes[0].DefaultKey + wrappedFullNode.Stop = fullNodes[0].Stop + wrappedFullNode.options = fullNodes[0].options + + return &wrappedFullNode +} + func (f TestFullNode) Shutdown(ctx context.Context) error { return f.Stop(ctx) } diff --git a/itests/kit/rpc.go b/itests/kit/rpc.go index 45fb095d5..ef714b78c 100644 --- a/itests/kit/rpc.go +++ b/itests/kit/rpc.go @@ -18,22 +18,29 @@ import ( "github.com/filecoin-project/lotus/node" ) -func CreateRPCServer(t *testing.T, handler http.Handler, listener net.Listener) (*httptest.Server, multiaddr.Multiaddr) { +type Closer func() + +func CreateRPCServer(t *testing.T, handler http.Handler, listener net.Listener) (*httptest.Server, multiaddr.Multiaddr, Closer) { testServ := &httptest.Server{ Listener: listener, Config: &http.Server{Handler: handler}, } testServ.Start() - t.Cleanup(func() { - waitUpTo(testServ.Close, time.Second, "Gave up waiting for RPC server to close after 1s") - }) - t.Cleanup(testServ.CloseClientConnections) + //t.Cleanup(func() { + // waitUpTo(testServ.Close, time.Second, "Gave up waiting for RPC server to close after 1s") + //}) + //t.Cleanup(testServ.CloseClientConnections) addr := testServ.Listener.Addr() maddr, err := manet.FromNetAddr(addr) require.NoError(t, err) - return testServ, maddr + closer := func() { + testServ.CloseClientConnections() + testServ.Close() + } + + return testServ, maddr, closer } func waitUpTo(fn func(), waitTime time.Duration, errMsg string) { @@ -51,30 +58,30 @@ func waitUpTo(fn func(), waitTime time.Duration, errMsg string) { } } -func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode { +func fullRpc(t *testing.T, f *TestFullNode) (*TestFullNode, Closer) { handler, err := node.FullNodeHandler(f.FullNode, false) require.NoError(t, err) l, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) - srv, maddr := CreateRPCServer(t, handler, l) + srv, maddr, rpcCloser := CreateRPCServer(t, handler, l) fmt.Printf("FULLNODE RPC ENV FOR CLI DEBUGGING `export FULLNODE_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String()) sendItestdNotif("FULLNODE_API_INFO", t.Name(), "ws://"+srv.Listener.Addr().String()) cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil) require.NoError(t, err) - t.Cleanup(stop) + //t.Cleanup(stop) f.ListenAddr, f.FullNode = maddr, cl - return f + return f, func() { stop(); rpcCloser() } } func minerRpc(t *testing.T, m *TestMiner) *TestMiner { handler, err := node.MinerHandler(m.StorageMiner, false) require.NoError(t, err) - srv, maddr := CreateRPCServer(t, handler, m.RemoteListener) + srv, maddr, _ := CreateRPCServer(t, handler, m.RemoteListener) fmt.Printf("creating RPC server for %s at %s\n", m.ActorAddr, srv.Listener.Addr().String()) fmt.Printf("SP RPC ENV FOR CLI DEBUGGING `export MINER_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String()) @@ -92,7 +99,7 @@ func minerRpc(t *testing.T, m *TestMiner) *TestMiner { func workerRpc(t *testing.T, m *TestWorker) *TestWorker { handler := sealworker.WorkerHandler(m.MinerNode.AuthVerify, m.FetchHandler, m.Worker, false) - srv, maddr := CreateRPCServer(t, handler, m.RemoteListener) + srv, maddr, _ := CreateRPCServer(t, handler, m.RemoteListener) fmt.Println("creating RPC server for a worker at: ", srv.Listener.Addr().String()) url := "ws://" + srv.Listener.Addr().String() + "/rpc/v0" diff --git a/itests/raft_messagesigner_test.go b/itests/raft_messagesigner_test.go index f52d7a8cd..511ec4eb0 100644 --- a/itests/raft_messagesigner_test.go +++ b/itests/raft_messagesigner_test.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/exitcode" @@ -41,16 +42,16 @@ func generatePrivKey() (*kit.Libp2p, error) { return &kit.Libp2p{PeerID: peerId, PrivKey: privkey}, nil } -func getRaftState(ctx context.Context, t *testing.T, node *kit.TestFullNode) consensus.RaftState { +func getRaftState(ctx context.Context, t *testing.T, node *kit.TestFullNode) *consensus.RaftState { raftState, err := node.RaftState(ctx) require.NoError(t, err) - rstate := raftState.(consensus.RaftState) - return rstate + //rstate := raftState.(*consensus.RaftState) + return raftState } func setup(ctx context.Context, t *testing.T, node0 *kit.TestFullNode, node1 *kit.TestFullNode, node2 *kit.TestFullNode, miner *kit.TestMiner) *kit.Ensemble { - blockTime := 5 * time.Millisecond + blockTime := 1000 * time.Millisecond pkey0, _ := generatePrivKey() pkey1, _ := generatePrivKey() @@ -72,13 +73,17 @@ func setup(ctx context.Context, t *testing.T, node0 *kit.TestFullNode, node1 *ki node.Override(node.GoRPCServer, modules.NewRPCServer), ) //raftOps := kit.ConstructorOpts() + kit.ThroughRPC() - ens := kit.NewEnsemble(t).FullNode(node0, raftOps).FullNode(node1, raftOps).FullNode(node2, raftOps) + ens := kit.NewEnsemble(t).FullNode(node0, raftOps, kit.ThroughRPC()).FullNode(node1, raftOps, kit.ThroughRPC()).FullNode(node2, raftOps, kit.ThroughRPC()) node0.AssignPrivKey(pkey0) node1.AssignPrivKey(pkey1) node2.AssignPrivKey(pkey2) - ens.MinerEnroll(miner, node0, kit.WithAllSubsystems()) + nodes := []*kit.TestFullNode{node0, node1, node2} + wrappedFullNode := kit.MergeFullNodes(nodes) + + ens.MinerEnroll(miner, wrappedFullNode, kit.WithAllSubsystems(), kit.ThroughRPC()) ens.Start() // Import miner wallet to all nodes @@ -234,3 +239,143 @@ func TestRaftStateLeaderDisconnects(t *testing.T) { } } } + +func TestRaftStateMiner(t *testing.T) { + + kit.QuietMiningLogs() + ctx := context.Background() + + var ( + node0 kit.TestFullNode + node1 kit.TestFullNode + node2 kit.TestFullNode + miner kit.TestMiner + ) + + setup(ctx, t, &node0, &node1, &node2, &miner) + + fmt.Println(node0.WalletList(context.Background())) + fmt.Println(node1.WalletList(context.Background())) + fmt.Println(node2.WalletList(context.Background())) + + bal, err := node0.WalletBalance(ctx, node0.DefaultKey.Address) + require.NoError(t, err) + + msgHalfBal := &types.Message{ + From: miner.OwnerKey.Address, + To: node0.DefaultKey.Address, + Value: big.Div(bal, big.NewInt(2)), + } + mu := uuid.New() + smHalfBal, err := miner.FullNode.MpoolPushMessage(ctx, msgHalfBal, &api.MessageSendSpec{ + MsgUuid: mu, + }) + require.NoError(t, err) + mLookup, err := node0.StateWaitMsg(ctx, smHalfBal.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode) + + rstate0 := getRaftState(ctx, t, &node0) + rstate1 := getRaftState(ctx, t, &node1) + rstate2 := getRaftState(ctx, t, &node2) + + require.EqualValues(t, rstate0, rstate1) + require.EqualValues(t, rstate0, rstate2) +} + +func TestRaftStateLeaderDisconnectsMiner(t *testing.T) { + + kit.QuietMiningLogs() + ctx := context.Background() + + var ( + node0 kit.TestFullNode + node1 kit.TestFullNode + node2 kit.TestFullNode + miner kit.TestMiner + ) + + nodes := []*kit.TestFullNode{&node0, &node1, &node2} + + setup(ctx, t, &node0, &node1, &node2, &miner) + + peerToNode := make(map[peer.ID]*kit.TestFullNode) + for _, n := range nodes { + peerToNode[n.Pkey.PeerID] = n + } + + bal, err := node0.WalletBalance(ctx, node0.DefaultKey.Address) + require.NoError(t, err) + + msgHalfBal := &types.Message{ + From: miner.OwnerKey.Address, + To: node0.DefaultKey.Address, + Value: big.Div(bal, big.NewInt(2)), + } + mu := uuid.New() + smHalfBal, err := miner.FullNode.MpoolPushMessage(ctx, msgHalfBal, &api.MessageSendSpec{ + MsgUuid: mu, + }) + require.NoError(t, err) + mLookup, err := node0.StateWaitMsg(ctx, smHalfBal.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode) + + rstate0 := getRaftState(ctx, t, &node0) + rstate1 := getRaftState(ctx, t, &node1) + rstate2 := getRaftState(ctx, t, &node2) + + require.True(t, reflect.DeepEqual(rstate0, rstate1)) + require.True(t, reflect.DeepEqual(rstate0, rstate2)) + + leader, err := node1.RaftLeader(ctx) + require.NoError(t, err) + leaderNode := peerToNode[leader] + // + //err = leaderNode.Stop(ctx) + //require.NoError(t, err) + //oldLeaderNode := leaderNode + // + //time.Sleep(5 * time.Second) + // + //newLeader := leader + //for _, n := range nodes { + // if n != leaderNode { + // newLeader, err = n.RaftLeader(ctx) + // require.NoError(t, err) + // require.NotEqual(t, newLeader, leader) + // } + //} + // + //require.NotEqual(t, newLeader, leader) + //leaderNode = peerToNode[newLeader] + + err = node0.Stop(ctx) + require.NoError(t, err) + + msg2 := &types.Message{ + From: miner.OwnerKey.Address, + To: node0.DefaultKey.Address, + Value: big.NewInt(100000), + } + mu2 := uuid.New() + + time.Sleep(5 * time.Second) + signedMsg2, err := miner.FullNode.MpoolPushMessage(ctx, msg2, &api.MessageSendSpec{ + MaxFee: abi.TokenAmount(config.DefaultDefaultMaxFee), + MsgUuid: mu2, + }) + require.NoError(t, err) + mLookup, err = leaderNode.StateWaitMsg(ctx, signedMsg2.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode) + + //rstate := getRaftState(ctx, t, leaderNode) + + //for _, n := range nodes { + // if n != oldLeaderNode { + // rs := getRaftState(ctx, t, n) + // require.True(t, reflect.DeepEqual(rs, rstate)) + // } + //} +} diff --git a/lib/consensus/raft/consensus.go b/lib/consensus/raft/consensus.go index 1b9e082cd..76a01f426 100644 --- a/lib/consensus/raft/consensus.go +++ b/lib/consensus/raft/consensus.go @@ -4,6 +4,7 @@ package consensus import ( "context" + "encoding/json" "errors" "fmt" "sort" @@ -28,16 +29,69 @@ import ( var logger = logging.Logger("raft") +type NonceMapType map[addr.Address]uint64 +type MsgUuidMapType map[uuid.UUID]*types.SignedMessage + type RaftState struct { - NonceMap map[addr.Address]uint64 - MsgUuids map[uuid.UUID]*types.SignedMessage + NonceMap NonceMapType + MsgUuids MsgUuidMapType } -func newRaftState() RaftState { - return RaftState{NonceMap: make(map[addr.Address]uint64), +func newRaftState() *RaftState { + return &RaftState{NonceMap: make(map[addr.Address]uint64), MsgUuids: make(map[uuid.UUID]*types.SignedMessage)} } +func (n *NonceMapType) MarshalJSON() ([]byte, error) { + marshalled := make(map[string]uint64) + for a, n := range *n { + marshalled[a.String()] = n + } + return json.Marshal(marshalled) +} + +func (n *NonceMapType) UnmarshalJSON(b []byte) error { + unmarshalled := make(map[string]uint64) + err := json.Unmarshal(b, &unmarshalled) + if err != nil { + return err + } + *n = make(map[addr.Address]uint64) + for saddr, nonce := range unmarshalled { + a, err := addr.NewFromString(saddr) + if err != nil { + return err + } + (*n)[a] = nonce + } + return nil +} + +func (m *MsgUuidMapType) MarshalJSON() ([]byte, error) { + marshalled := make(map[string]*types.SignedMessage) + for u, msg := range *m { + marshalled[u.String()] = msg + } + return json.Marshal(marshalled) +} + +func (m *MsgUuidMapType) UnmarshalJSON(b []byte) error { + unmarshalled := make(map[string]*types.SignedMessage) + err := json.Unmarshal(b, &unmarshalled) + if err != nil { + return err + } + *m = make(map[uuid.UUID]*types.SignedMessage) + for suid, msg := range unmarshalled { + u, err := uuid.Parse(suid) + if err != nil { + return err + } + (*m)[u] = msg + } + return nil +} + type ConsensusOp struct { Nonce uint64 `codec:"nonce,omitempty"` Uuid uuid.UUID `codec:"uuid,omitempty"` @@ -46,7 +100,7 @@ type ConsensusOp struct { } func (c ConsensusOp) ApplyTo(state consensus.State) (consensus.State, error) { - s := state.(RaftState) + s := state.(*RaftState) s.NonceMap[c.Addr] = c.Nonce s.MsgUuids[c.Uuid] = c.SignedMsg return s, nil @@ -67,7 +121,7 @@ type Consensus struct { consensus consensus.OpLogConsensus actor consensus.Actor raft *raftWrapper - state RaftState + state *RaftState rpcClient *rpc.Client rpcReady chan struct{} @@ -138,7 +192,8 @@ func NewConsensusWithRPCClient(staging bool) func(host host.Host, if err != nil { return nil, err } - cc.SetClient(rpcClient) + cc.rpcClient = rpcClient + cc.rpcReady <- struct{}{} return cc, nil } } @@ -245,12 +300,6 @@ func (cc *Consensus) Shutdown(ctx context.Context) error { return nil } -// SetClient makes the component ready to perform RPC requets -func (cc *Consensus) SetClient(c *rpc.Client) { - cc.rpcClient = c - cc.rpcReady <- struct{}{} -} - // Ready returns a channel which is signaled when the Consensus // algorithm has finished bootstrapping and is ready to use func (cc *Consensus) Ready(ctx context.Context) <-chan struct{} { @@ -451,7 +500,7 @@ func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error { // last agreed-upon RaftState known by this node. No writes are allowed, as all // writes to the shared state should happen through the Consensus component // methods. -func (cc *Consensus) State(ctx context.Context) (consensus.State, error) { +func (cc *Consensus) State(ctx context.Context) (*RaftState, error) { //_, span := trace.StartSpan(ctx, "consensus/RaftState") //defer span.End() @@ -463,7 +512,7 @@ func (cc *Consensus) State(ctx context.Context) (consensus.State, error) { if err != nil { return nil, err } - state, ok := st.(RaftState) + state, ok := st.(*RaftState) if !ok { return nil, errors.New("wrong state type") } diff --git a/lib/consensus/raft/interfaces.go b/lib/consensus/raft/interfaces.go index 57c83da2e..2b77d1ebe 100644 --- a/lib/consensus/raft/interfaces.go +++ b/lib/consensus/raft/interfaces.go @@ -4,21 +4,10 @@ import ( "context" consensus "github.com/libp2p/go-libp2p-consensus" - rpc "github.com/libp2p/go-libp2p-gorpc" "github.com/libp2p/go-libp2p/core/peer" ) -// Component represents a piece of ipfscluster. Cluster components -// usually run their own goroutines (a http server for example). They -// communicate with the main Cluster component and other components -// (both local and remote), using an instance of rpc.Client. -type Component interface { - SetClient(*rpc.Client) - Shutdown(context.Context) error -} - type ConsensusAPI interface { - Component // Returns a channel to signal that the consensus layer is ready // allowing the main component to wait for it during start. Ready(context.Context) <-chan struct{} @@ -47,4 +36,6 @@ type ConsensusAPI interface { Distrust(context.Context, peer.ID) error // Returns true if current node is the cluster leader IsLeader(ctx context.Context) bool + + Shutdown(context.Context) error } diff --git a/node/impl/full.go b/node/impl/full.go index da104b1b8..dc2568de3 100644 --- a/node/impl/full.go +++ b/node/impl/full.go @@ -5,11 +5,11 @@ import ( "time" logging "github.com/ipfs/go-log/v2" - consensus "github.com/libp2p/go-libp2p-consensus" "github.com/libp2p/go-libp2p/core/peer" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + consensus2 "github.com/filecoin-project/lotus/lib/consensus/raft" "github.com/filecoin-project/lotus/node/impl/client" "github.com/filecoin-project/lotus/node/impl/common" "github.com/filecoin-project/lotus/node/impl/full" @@ -35,6 +35,7 @@ type FullNodeAPI struct { full.MsigAPI full.WalletAPI full.SyncAPI + full.RaftAPI DS dtypes.MetadataDS NetworkName dtypes.NetworkName @@ -118,8 +119,12 @@ func (n *FullNodeAPI) NodeStatus(ctx context.Context, inclChainStatus bool) (sta return status, nil } -func (n *FullNodeAPI) RaftState(ctx context.Context) (consensus.State, error) { - return n.MpoolAPI.GetRaftState(ctx) +func (n *FullNodeAPI) RaftState(ctx context.Context) (*consensus2.RaftState, error) { + return n.RaftAPI.GetRaftState(ctx) +} + +func (n *FullNodeAPI) RaftLeader(ctx context.Context) (peer.ID, error) { + return n.RaftAPI.Leader(ctx) } var _ api.FullNode = &FullNodeAPI{} diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 9bd4b5e18..d7ef8de53 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -5,8 +5,6 @@ import ( "encoding/json" "github.com/ipfs/go-cid" - consensus "github.com/libp2p/go-libp2p-consensus" - "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/fx" "golang.org/x/xerrors" @@ -46,6 +44,8 @@ type MpoolAPI struct { GasAPI + RaftAPI + MessageSigner messagesigner.MsgSigner // MessageSigner *messagesigner.MessageSigner @@ -147,9 +147,9 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe inMsg := *msg // Redirect to leader if current node is not leader. A single non raft based node is always the leader - if !a.MessageSigner.IsLeader(ctx) { + if !a.RaftAPI.IsLeader(ctx) { var signedMsg types.SignedMessage - redirected, err := a.MessageSigner.RedirectToLeader(ctx, "MpoolPushMessage", api.MpoolMessageWhole{Msg: msg, Spec: spec}, &signedMsg) + redirected, err := a.RaftAPI.RedirectToLeader(ctx, "MpoolPushMessage", api.MpoolMessageWhole{Msg: msg, Spec: spec}, &signedMsg) if err != nil { return nil, err } @@ -290,10 +290,11 @@ func (a *MpoolAPI) MpoolSub(ctx context.Context) (<-chan api.MpoolUpdate, error) return a.Mpool.Updates(ctx) } -func (a *MpoolAPI) GetRaftState(ctx context.Context) (consensus.State, error) { - return a.MessageSigner.GetRaftState(ctx) -} - -func (a *MpoolAPI) RaftLeader(ctx context.Context) (peer.ID, error) { - return a.MessageSigner.Leader(ctx) -} +//func (a *MpoolAPI) GetRaftState(ctx context.Context) (consensus.RaftState, error) { +// state, err := a.MessageSigner.GetRaftState(ctx) +// raftState := state.() +//} +// +//func (a *MpoolAPI) RaftLeader(ctx context.Context) (peer.ID, error) { +// return a.MessageSigner.Leader(ctx) +//} diff --git a/node/impl/full/raft.go b/node/impl/full/raft.go new file mode 100644 index 000000000..115a9e26e --- /dev/null +++ b/node/impl/full/raft.go @@ -0,0 +1,46 @@ +package full + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/peer" + "go.uber.org/fx" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/messagesigner" + consensus "github.com/filecoin-project/lotus/lib/consensus/raft" +) + +type RaftAPI struct { + fx.In + + MessageSigner *messagesigner.MessageSignerConsensus `optional:"true"` +} + +func (r *RaftAPI) GetRaftState(ctx context.Context) (*consensus.RaftState, error) { + if r.MessageSigner == nil { + return nil, xerrors.Errorf("Raft consensus not enabled. Please check your configuration") + } + return r.MessageSigner.GetRaftState(ctx) +} + +func (r *RaftAPI) Leader(ctx context.Context) (peer.ID, error) { + if r.MessageSigner == nil { + return "", xerrors.Errorf("Raft consensus not enabled. Please check your configuration") + } + return r.MessageSigner.Leader(ctx) +} + +func (r *RaftAPI) IsLeader(ctx context.Context) bool { + if r.MessageSigner == nil { + return true + } + return r.MessageSigner.IsLeader(ctx) +} + +func (r *RaftAPI) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) { + if r.MessageSigner == nil { + return false, xerrors.Errorf("Raft consensus not enabled. Please check your configuration") + } + return r.MessageSigner.RedirectToLeader(ctx, method, arg, ret) +}