Ignore mpool msg existing errors for applying raft state

This commit is contained in:
Shrenuj Bansal 2022-10-06 11:44:13 +00:00
parent 98481821d8
commit 17a77220c2
6 changed files with 25 additions and 51 deletions

View File

@ -82,6 +82,7 @@ var (
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
ErrTooManyPendingMessages = errors.New("too many pending messages for actor")
ErrNonceGap = errors.New("unfulfilled nonce gap")
ErrExistingNonce = errors.New("message with nonce already exists")
)
const (
@ -276,7 +277,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict, untrusted
}
} else {
return false, xerrors.Errorf("message from %s with nonce %d already in mpool: %w",
m.Message.From, m.Message.Nonce, ErrSoftValidationFailure)
m.Message.From, m.Message.Nonce, ErrExistingNonce)
}
ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.RequiredFunds().Int)

View File

@ -67,7 +67,12 @@ func (ms *MessageSignerConsensus) SignMessage(
u = spec.MsgUuid
}
op := &consensus.ConsensusOp{signedMsg.Message.Nonce, u, signedMsg.Message.From, signedMsg}
op := &consensus.ConsensusOp{
Nonce: signedMsg.Message.Nonce,
Uuid: u,
Addr: signedMsg.Message.From,
SignedMsg: signedMsg,
}
err = ms.consensus.Commit(ctx, op)
if err != nil {
return nil, err

View File

@ -9,7 +9,6 @@ import (
"io/ioutil"
"net"
"net/http"
"reflect"
"sync"
"testing"
"time"
@ -351,43 +350,6 @@ func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...Node
return n
}
func proxy(ins []api.FullNode, outstr *api.FullNodeStruct) {
outs := api.GetInternalStructs(outstr)
var rins []reflect.Value
//peertoNode := make(map[peer.ID]reflect.Value)
for _, in := range ins {
rin := reflect.ValueOf(in)
rins = append(rins, rin)
//peertoNode[ins] = rin
}
for _, out := range outs {
rint := reflect.ValueOf(out).Elem()
//ra := reflect.ValueOf(in)
for f := 0; f < rint.NumField(); f++ {
field := rint.Type().Field(f)
var fns []reflect.Value
for _, rin := range rins {
fns = append(fns, rin.MethodByName(field.Name))
}
//fn := ra.MethodByName(field.Name)
rint.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
//ctx := args[0].Interface().(context.Context)
//
//rin := peertoNode[ins[0].Leader(ctx)]
//fn := rin.MethodByName(field.Name)
//
//return fn.Call(args)
return fns[0].Call(args)
}))
}
}
}
// Start starts all enrolled nodes.
func (n *Ensemble) Start() *Ensemble {
ctx := context.Background()

View File

@ -150,6 +150,8 @@ func TestRaftState(t *testing.T) {
rstate1 := getRaftState(ctx, t, &node1)
rstate2 := getRaftState(ctx, t, &node2)
require.Equal(t, rstate0.NonceMap[miner.OwnerKey.Address], uint64(0))
require.EqualValues(t, rstate0, rstate1)
require.EqualValues(t, rstate0, rstate2)
}

View File

@ -72,9 +72,18 @@ type ConsensusOp struct {
func (c ConsensusOp) ApplyTo(state consensus.State) (consensus.State, error) {
s := state.(*RaftState)
s.NonceMap[c.Addr] = c.Nonce
s.MsgUuids[c.Uuid] = c.SignedMsg
//s.Mpool.Add(context.TODO(), c.SignedMsg)
s.Mpool.Push(context.TODO(), c.SignedMsg, false)
if c.SignedMsg != nil {
tmp := *c.SignedMsg
s.MsgUuids[c.Uuid] = &tmp
_, err := s.Mpool.Push(context.TODO(), c.SignedMsg, false)
// Since this is only meant to keep messages in sync, ignore any error which
// shows the message already exists in the mpool
if err != nil && !api.ErrorIsIn(err, []error{messagepool.ErrExistingNonce}) {
return nil, err
}
}
return s, nil
}
@ -188,9 +197,7 @@ func (cc *Consensus) WaitForSync(ctx context.Context) error {
//ctx, span := trace.StartSpan(ctx, "consensus/WaitForSync")
//defer span.End()
leaderCtx, cancel := context.WithTimeout(
ctx,
time.Duration(cc.config.WaitForLeaderTimeout))
leaderCtx, cancel := context.WithTimeout(ctx, cc.config.WaitForLeaderTimeout)
defer cancel()
// 1 - wait for leader
@ -323,10 +330,7 @@ func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interf
// No leader, wait for one
if err != nil {
logger.Warn("there seems to be no leader. Waiting for one")
rctx, cancel := context.WithTimeout(
ctx,
time.Duration(cc.config.WaitForLeaderTimeout),
)
rctx, cancel := context.WithTimeout(ctx, cc.config.WaitForLeaderTimeout)
defer cancel()
pidstr, err := cc.raft.WaitForLeader(rctx)

View File

@ -133,7 +133,7 @@ func (rw *raftWrapper) makeTransport() (err error) {
logger.Debug("creating libp2p Raft transport")
rw.transport, err = p2praft.NewLibp2pTransport(
rw.host,
time.Duration(rw.config.NetworkTimeout),
rw.config.NetworkTimeout,
)
return err
}