Retries within proxy working

This commit is contained in:
Shrenuj Bansal 2022-09-28 15:07:05 +00:00
parent 559c2c6d34
commit 570f61438a
5 changed files with 186 additions and 82 deletions

View File

@ -4,6 +4,8 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/filecoin-project/lotus/lib/retry"
"go.uber.org/atomic"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@ -11,6 +13,7 @@ import (
"reflect" "reflect"
"strings" "strings"
"syscall" "syscall"
"time"
"github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
@ -256,9 +259,6 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
} }
//fn := ra.MethodByName(field.Name) //fn := ra.MethodByName(field.Name)
//curr := 0
//total := len(rins)
//retryFunc := func(args []reflect.Value) (results []reflect.Value) { //retryFunc := func(args []reflect.Value) (results []reflect.Value) {
// //ctx := args[0].Interface().(context.Context) // //ctx := args[0].Interface().(context.Context)
// // // //
@ -273,26 +273,34 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
//} //}
rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) { rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
//errorsToRetry := []error{&jsonrpc.RPCConnectionError{}} errorsToRetry := []error{&jsonrpc.RPCConnectionError{}, &jsonrpc.ErrClient{}}
//initialBackoff, err := time.ParseDuration("1s") initialBackoff, err := time.ParseDuration("1s")
//if err != nil { if err != nil {
// return nil return nil
//} }
//result, err := retry.Retry(5, initialBackoff, errorsToRetry, func() (results []reflect.Value, err2 error) { var curr atomic.Int64
// //ctx := args[0].Interface().(context.Context) curr.Store(-1)
// // total := len(rins)
// //rin := peertoNode[ins[0].Leader(ctx)] ctx := args[0].Interface().(context.Context)
// //fn := rin.MethodByName(field.Name) result, err := retry.Retry(ctx, 5, initialBackoff, errorsToRetry, func() (results []reflect.Value, err2 error) {
// // //ctx := args[0].Interface().(context.Context)
// //return fn.Call(args)
// //
// toCall := curr //rin := peertoNode[ins[0].Leader(ctx)]
// curr += 1 % total //fn := rin.MethodByName(field.Name)
// result := fns[toCall].Call(args) //
// return result, results[len(results)-1].Interface().(error) //return fn.Call(args)
//})
//return result toCall := curr.Inc() % int64(total)
return fns[0].Call(args)
result := fns[toCall].Call(args)
if result[len(result)-1].IsNil() {
return result, nil
}
e := result[len(result)-1].Interface().(error)
return result, e
})
return result
//return fns[0].Call(args)
})) }))
} }
} }

View File

@ -51,7 +51,7 @@ func getRaftState(ctx context.Context, t *testing.T, node *kit.TestFullNode) *co
func setup(ctx context.Context, t *testing.T, node0 *kit.TestFullNode, node1 *kit.TestFullNode, node2 *kit.TestFullNode, miner *kit.TestMiner) *kit.Ensemble { func setup(ctx context.Context, t *testing.T, node0 *kit.TestFullNode, node1 *kit.TestFullNode, node2 *kit.TestFullNode, miner *kit.TestMiner) *kit.Ensemble {
blockTime := 1000 * time.Millisecond //blockTime := 1000 * time.Millisecond
pkey0, _ := generatePrivKey() pkey0, _ := generatePrivKey()
pkey1, _ := generatePrivKey() pkey1, _ := generatePrivKey()
@ -304,6 +304,108 @@ func TestRaftStateLeaderDisconnectsMiner(t *testing.T) {
peerToNode[n.Pkey.PeerID] = n peerToNode[n.Pkey.PeerID] = n
} }
//bal, err := node0.WalletBalance(ctx, node0.DefaultKey.Address)
//require.NoError(t, err)
//msgHalfBal := &types.Message{
// From: miner.OwnerKey.Address,
// To: node0.DefaultKey.Address,
// Value: big.Div(bal, big.NewInt(2)),
//}
//mu := uuid.New()
//smHalfBal, err := miner.FullNode.MpoolPushMessage(ctx, msgHalfBal, &api.MessageSendSpec{
// MsgUuid: mu,
//})
//require.NoError(t, err)
//mLookup, err := node0.StateWaitMsg(ctx, smHalfBal.Cid(), 3, api.LookbackNoLimit, true)
//require.NoError(t, err)
//require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode)
//
//rstate0 := getRaftState(ctx, t, &node0)
//rstate1 := getRaftState(ctx, t, &node1)
//rstate2 := getRaftState(ctx, t, &node2)
//
//require.True(t, reflect.DeepEqual(rstate0, rstate1))
//require.True(t, reflect.DeepEqual(rstate0, rstate2))
// Take leader node down
leader, err := node1.RaftLeader(ctx)
require.NoError(t, err)
leaderNode := peerToNode[leader]
err = leaderNode.Stop(ctx)
require.NoError(t, err)
oldLeaderNode := leaderNode
time.Sleep(5 * time.Second)
newLeader := leader
for _, n := range nodes {
if n != leaderNode {
newLeader, err = n.RaftLeader(ctx)
require.NoError(t, err)
require.NotEqual(t, newLeader, leader)
}
}
require.NotEqual(t, newLeader, leader)
leaderNode = peerToNode[newLeader]
fmt.Println("New leader: ", newLeader)
//err = node0.Stop(ctx)
//require.NoError(t, err)
msg2 := &types.Message{
From: miner.OwnerKey.Address,
To: node0.DefaultKey.Address,
Value: big.NewInt(100000),
}
mu2 := uuid.New()
signedMsg2, err := miner.FullNode.MpoolPushMessage(ctx, msg2, &api.MessageSendSpec{
MaxFee: abi.TokenAmount(config.DefaultDefaultMaxFee),
MsgUuid: mu2,
})
require.NoError(t, err)
mLookup, err := leaderNode.StateWaitMsg(ctx, signedMsg2.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode)
fmt.Println("!!!!!!!!!!!!!!!!TEST FINISHED!!!!!!!!!!!!!!!!!!!")
rstate := getRaftState(ctx, t, leaderNode)
for _, n := range nodes {
if n != oldLeaderNode {
rs := getRaftState(ctx, t, n)
require.True(t, reflect.DeepEqual(rs, rstate))
}
}
}
func TestLeaderDisconnectsCheckMsgStateOnNewLeader(t *testing.T) {
kit.QuietMiningLogs()
ctx := context.Background()
var (
node0 kit.TestFullNode
node1 kit.TestFullNode
node2 kit.TestFullNode
miner kit.TestMiner
)
nodes := []*kit.TestFullNode{&node0, &node1, &node2}
setup(ctx, t, &node0, &node1, &node2, &miner)
peerToNode := make(map[peer.ID]*kit.TestFullNode)
for _, n := range nodes {
peerToNode[n.Pkey.PeerID] = n
}
bal, err := node0.WalletBalance(ctx, node0.DefaultKey.Address) bal, err := node0.WalletBalance(ctx, node0.DefaultKey.Address)
require.NoError(t, err) require.NoError(t, err)
@ -317,65 +419,55 @@ func TestRaftStateLeaderDisconnectsMiner(t *testing.T) {
MsgUuid: mu, MsgUuid: mu,
}) })
require.NoError(t, err) require.NoError(t, err)
mLookup, err := node0.StateWaitMsg(ctx, smHalfBal.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode)
rstate0 := getRaftState(ctx, t, &node0) //
rstate1 := getRaftState(ctx, t, &node1) //rstate0 := getRaftState(ctx, t, &node0)
rstate2 := getRaftState(ctx, t, &node2) //rstate1 := getRaftState(ctx, t, &node1)
//rstate2 := getRaftState(ctx, t, &node2)
require.True(t, reflect.DeepEqual(rstate0, rstate1)) //
require.True(t, reflect.DeepEqual(rstate0, rstate2)) //require.True(t, reflect.DeepEqual(rstate0, rstate1))
//require.True(t, reflect.DeepEqual(rstate0, rstate2))
// Take leader node down
leader, err := node1.RaftLeader(ctx) leader, err := node1.RaftLeader(ctx)
require.NoError(t, err) require.NoError(t, err)
leaderNode := peerToNode[leader] leaderNode := peerToNode[leader]
//
//err = leaderNode.Stop(ctx) err = leaderNode.Stop(ctx)
//require.NoError(t, err) require.NoError(t, err)
//oldLeaderNode := leaderNode oldLeaderNode := leaderNode
//
//time.Sleep(5 * time.Second) //time.Sleep(5 * time.Second)
//
//newLeader := leader
//for _, n := range nodes {
// if n != leaderNode {
// newLeader, err = n.RaftLeader(ctx)
// require.NoError(t, err)
// require.NotEqual(t, newLeader, leader)
// }
//}
//
//require.NotEqual(t, newLeader, leader)
//leaderNode = peerToNode[newLeader]
err = node0.Stop(ctx) newLeader := leader
for _, n := range nodes {
if n != leaderNode {
newLeader, err = n.RaftLeader(ctx)
require.NoError(t, err) require.NoError(t, err)
require.NotEqual(t, newLeader, leader)
msg2 := &types.Message{ }
From: miner.OwnerKey.Address,
To: node0.DefaultKey.Address,
Value: big.NewInt(100000),
} }
mu2 := uuid.New()
time.Sleep(5 * time.Second) require.NotEqual(t, newLeader, leader)
signedMsg2, err := miner.FullNode.MpoolPushMessage(ctx, msg2, &api.MessageSendSpec{ leaderNode = peerToNode[newLeader]
MaxFee: abi.TokenAmount(config.DefaultDefaultMaxFee),
MsgUuid: mu2, fmt.Println("New leader: ", newLeader)
})
require.NoError(t, err) mLookup, err := leaderNode.StateWaitMsg(ctx, smHalfBal.Cid(), 3, api.LookbackNoLimit, true)
mLookup, err = leaderNode.StateWaitMsg(ctx, signedMsg2.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode) require.Equal(t, exitcode.Ok, mLookup.Receipt.ExitCode)
//rstate := getRaftState(ctx, t, leaderNode) //err = node0.Stop(ctx)
//require.NoError(t, err)
//for _, n := range nodes { fmt.Println("!!!!!!!!!!!!!!!!TEST FINISHED!!!!!!!!!!!!!!!!!!!")
// if n != oldLeaderNode {
// rs := getRaftState(ctx, t, n) rstate := getRaftState(ctx, t, leaderNode)
// require.True(t, reflect.DeepEqual(rs, rstate))
// } for _, n := range nodes {
//} if n != oldLeaderNode {
rs := getRaftState(ctx, t, n)
require.True(t, reflect.DeepEqual(rs, rstate))
}
}
} }

View File

@ -1,6 +1,7 @@
package retry package retry
import ( import (
"context"
"time" "time"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
@ -10,10 +11,11 @@ import (
var log = logging.Logger("retry") var log = logging.Logger("retry")
func Retry[T any](attempts int, initialBackoff time.Duration, errorTypes []error, f func() (T, error)) (result T, err error) { func Retry[T any](ctx context.Context, attempts int, initialBackoff time.Duration, errorTypes []error, f func() (T, error)) (result T, err error) {
for i := 0; i < attempts; i++ { for i := 0; i < attempts; i++ {
if i > 0 { if i > 0 {
log.Info("Retrying after error:", err) log.Info("Retrying after error:", err)
//debug.PrintStack()
time.Sleep(initialBackoff) time.Sleep(initialBackoff)
initialBackoff *= 2 initialBackoff *= 2
} }
@ -21,6 +23,9 @@ func Retry[T any](attempts int, initialBackoff time.Duration, errorTypes []error
if err == nil || !api.ErrorIsIn(err, errorTypes) { if err == nil || !api.ErrorIsIn(err, errorTypes) {
return result, err return result, err
} }
if ctx.Err() != nil {
return result, ctx.Err()
}
} }
log.Errorf("Failed after %d attempts, last error: %s", attempts, err) log.Errorf("Failed after %d attempts, last error: %s", attempts, err)
return result, err return result, err

View File

@ -2,7 +2,6 @@ package full
import ( import (
"context" "context"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"

View File

@ -37,7 +37,6 @@ import (
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
@ -56,7 +55,6 @@ import (
"github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/retry"
"github.com/filecoin-project/lotus/markets" "github.com/filecoin-project/lotus/markets"
"github.com/filecoin-project/lotus/markets/dagstore" "github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/markets/idxprov" "github.com/filecoin-project/lotus/markets/idxprov"
@ -89,12 +87,14 @@ func (a *UuidWrapper) MpoolPushMessage(ctx context.Context, msg *types.Message,
spec = new(api.MessageSendSpec) spec = new(api.MessageSendSpec)
} }
spec.MsgUuid = uuid.New() spec.MsgUuid = uuid.New()
errorsToRetry := []error{&jsonrpc.RPCConnectionError{}} return a.FullNode.MpoolPushMessage(ctx, msg, spec)
initialBackoff, err := time.ParseDuration("1s")
if err != nil { //errorsToRetry := []error{&jsonrpc.RPCConnectionError{}, &jsonrpc.ErrClient{}}
return nil, err //initialBackoff, err := time.ParseDuration("1s")
} //if err != nil {
return retry.Retry(5, initialBackoff, errorsToRetry, func() (*types.SignedMessage, error) { return a.FullNode.MpoolPushMessage(ctx, msg, spec) }) // return nil, err
//}
//return retry.Retry(5, initialBackoff, errorsToRetry, func() (*types.SignedMessage, error) { return a.FullNode.MpoolPushMessage(ctx, msg, spec) })
} }
func MakeUuidWrapper(a v1api.RawFullNodeAPI) v1api.FullNode { func MakeUuidWrapper(a v1api.RawFullNodeAPI) v1api.FullNode {