solution for mining loop hitting the same node

This commit is contained in:
Shrenuj Bansal 2022-10-05 15:32:24 +00:00
parent dde204fb6a
commit 98481821d8
3 changed files with 26 additions and 39 deletions

View File

@ -15,7 +15,6 @@ import (
"github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go.uber.org/atomic"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc"
@ -231,13 +230,9 @@ func GetFullNodeAPI(ctx *cli.Context) (v0api.FullNode, jsonrpc.ClientCloser, err
return client.NewFullNodeRPCV0(ctx.Context, addr, headers) return client.NewFullNodeRPCV0(ctx.Context, addr, headers)
} }
func RouteRequest() {
}
// Not thread safe // Not thread safe
func OnSingleNode(ctx context.Context) context.Context { func OnSingleNode(ctx context.Context) context.Context {
return context.WithValue(ctx, "retry node", new(*int)) return context.WithValue(ctx, "retry-node", new(*int))
} }
func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) { func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
@ -253,7 +248,6 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
for _, out := range outs { for _, out := range outs {
rint := reflect.ValueOf(out).Elem() rint := reflect.ValueOf(out).Elem()
//ra := reflect.ValueOf(in)
for f := 0; f < rint.NumField(); f++ { for f := 0; f < rint.NumField(); f++ {
field := rint.Type().Field(f) field := rint.Type().Field(f)
@ -262,20 +256,6 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
for _, rin := range rins { for _, rin := range rins {
fns = append(fns, rin.MethodByName(field.Name)) fns = append(fns, rin.MethodByName(field.Name))
} }
//fn := ra.MethodByName(field.Name)
//retryFunc := func(args []reflect.Value) (results []reflect.Value) {
// //ctx := args[0].Interface().(context.Context)
// //
// //rin := peertoNode[ins[0].Leader(ctx)]
// //fn := rin.MethodByName(field.Name)
// //
// //return fn.Call(args)
//
// toCall := curr
// curr += 1 % total
// return fns[toCall].Call(args)
//}
rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) { rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
errorsToRetry := []error{&jsonrpc.RPCConnectionError{}, &jsonrpc.ErrClient{}} errorsToRetry := []error{&jsonrpc.RPCConnectionError{}, &jsonrpc.ErrClient{}}
@ -283,10 +263,22 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
if err != nil { if err != nil {
return nil return nil
} }
var curr atomic.Int64
curr.Store(-1)
total := len(rins)
ctx := args[0].Interface().(context.Context) ctx := args[0].Interface().(context.Context)
curr := -1
// for calls that need to be performed on the same node
// primarily for miner when calling create block and submit block subsequently
if ctx.Value("retry-node") != nil {
if (*ctx.Value("retry-node").(**int)) == nil {
*ctx.Value("retry-node").(**int) = &curr
} else {
curr = **ctx.Value("retry-node").(**int) - 1
}
}
total := len(rins)
result, err := retry.Retry(ctx, 5, initialBackoff, errorsToRetry, func() (results []reflect.Value, err2 error) { result, err := retry.Retry(ctx, 5, initialBackoff, errorsToRetry, func() (results []reflect.Value, err2 error) {
//ctx := args[0].Interface().(context.Context) //ctx := args[0].Interface().(context.Context)
// //
@ -294,21 +286,9 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
//fn := rin.MethodByName(field.Name) //fn := rin.MethodByName(field.Name)
// //
//return fn.Call(args) //return fn.Call(args)
curr = (curr + 1) % total
// for calls that need to be performed on the same node result := fns[curr].Call(args)
// primarily for miner when calling create block and submit block subsequently
var toCall int64
if ctx.Value("retry node") == nil {
toCall = curr.Inc() % int64(total)
} else if (*ctx.Value("retry node").(**int64)) == nil {
toCall = curr.Inc() % int64(total)
*ctx.Value("retry node").(**int64) = &toCall
} else {
toCall = **ctx.Value("retry node").(**int64)
}
//toCall := curr.Inc() % int64(total)
result := fns[toCall].Call(args)
if result[len(result)-1].IsNil() { if result[len(result)-1].IsNil() {
return result, nil return result, nil
} }
@ -316,7 +296,6 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
return result, e return result, e
}) })
return result return result
//return fns[0].Call(args)
})) }))
} }
} }

View File

@ -359,6 +359,9 @@ func TestRaftStateLeaderDisconnectsMiner(t *testing.T) {
} }
} }
// Miner sends message on leader
// Leader disconnects
// Call StateWaitMsg on new leader
func TestLeaderDisconnectsCheckMsgStateOnNewLeader(t *testing.T) { func TestLeaderDisconnectsCheckMsgStateOnNewLeader(t *testing.T) {
kit.QuietMiningLogs() kit.QuietMiningLogs()
@ -405,6 +408,7 @@ func TestLeaderDisconnectsCheckMsgStateOnNewLeader(t *testing.T) {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
// Check if all active nodes update their leader
newLeader := leader newLeader := leader
for _, n := range nodes { for _, n := range nodes {
if n != leaderNode { if n != leaderNode {
@ -423,6 +427,7 @@ func TestLeaderDisconnectsCheckMsgStateOnNewLeader(t *testing.T) {
rstate := getRaftState(ctx, t, leaderNode) rstate := getRaftState(ctx, t, leaderNode)
// Check if Raft state is consistent on all active nodes
for _, n := range nodes { for _, n := range nodes {
if n != oldLeaderNode { if n != oldLeaderNode {
rs := getRaftState(ctx, t, n) rs := getRaftState(ctx, t, n)

View File

@ -6,6 +6,7 @@ import (
"crypto/rand" "crypto/rand"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
cliutil "github.com/filecoin-project/lotus/cli/util"
"os" "os"
"sync" "sync"
"time" "time"
@ -208,6 +209,8 @@ func (m *Miner) mine(ctx context.Context) {
var lastBase MiningBase var lastBase MiningBase
minerLoop: minerLoop:
for { for {
ctx := cliutil.OnSingleNode(ctx)
select { select {
case <-m.stop: case <-m.stop:
stopping := m.stopping stopping := m.stopping