Change Mpool push API to have an option to publish
This commit is contained in:
parent
986c5e3c68
commit
dde204fb6a
@ -667,7 +667,7 @@ func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, curTs *types.T
|
|||||||
return publish, nil
|
return publish, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Cid, error) {
|
func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage, publish bool) (cid.Cid, error) {
|
||||||
done := metrics.Timer(ctx, metrics.MpoolPushDuration)
|
done := metrics.Timer(ctx, metrics.MpoolPushDuration)
|
||||||
defer done()
|
defer done()
|
||||||
|
|
||||||
@ -683,14 +683,14 @@ func (mp *MessagePool) Push(ctx context.Context, m *types.SignedMessage) (cid.Ci
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
publish, err := mp.addTs(ctx, m, mp.curTs, true, false)
|
ok, err := mp.addTs(ctx, m, mp.curTs, true, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mp.curTsLk.Unlock()
|
mp.curTsLk.Unlock()
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
}
|
}
|
||||||
mp.curTsLk.Unlock()
|
mp.curTsLk.Unlock()
|
||||||
|
|
||||||
if publish {
|
if ok && publish {
|
||||||
msgb, err := m.Serialize()
|
msgb, err := m.Serialize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, xerrors.Errorf("error serializing message: %w", err)
|
return cid.Undef, xerrors.Errorf("error serializing message: %w", err)
|
||||||
|
@ -539,7 +539,7 @@ func TestLoadLocal(t *testing.T) {
|
|||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||||
cid, err := mp.Push(context.TODO(), m)
|
cid, err := mp.Push(context.TODO(), m, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -612,7 +612,7 @@ func TestClearAll(t *testing.T) {
|
|||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||||
_, err := mp.Push(context.TODO(), m)
|
_, err := mp.Push(context.TODO(), m, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -670,7 +670,7 @@ func TestClearNonLocal(t *testing.T) {
|
|||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||||
_, err := mp.Push(context.TODO(), m)
|
_, err := mp.Push(context.TODO(), m, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -743,7 +743,7 @@ func TestUpdates(t *testing.T) {
|
|||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||||
_, err := mp.Push(context.TODO(), m)
|
_, err := mp.Push(context.TODO(), m, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -60,7 +60,7 @@ func TestRepubMessages(t *testing.T) {
|
|||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||||
_, err := mp.Push(context.TODO(), m)
|
_, err := mp.Push(context.TODO(), m, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -235,6 +235,11 @@ func RouteRequest() {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Not thread safe
|
||||||
|
func OnSingleNode(ctx context.Context) context.Context {
|
||||||
|
return context.WithValue(ctx, "retry node", new(*int))
|
||||||
|
}
|
||||||
|
|
||||||
func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
|
func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
|
||||||
outs := api.GetInternalStructs(outstr)
|
outs := api.GetInternalStructs(outstr)
|
||||||
|
|
||||||
@ -290,7 +295,18 @@ func FullNodeProxy[T api.FullNode](ins []T, outstr *api.FullNodeStruct) {
|
|||||||
//
|
//
|
||||||
//return fn.Call(args)
|
//return fn.Call(args)
|
||||||
|
|
||||||
toCall := curr.Inc() % int64(total)
|
// for calls that need to be performed on the same node
|
||||||
|
// primarily for miner when calling create block and submit block subsequently
|
||||||
|
var toCall int64
|
||||||
|
if ctx.Value("retry node") == nil {
|
||||||
|
toCall = curr.Inc() % int64(total)
|
||||||
|
} else if (*ctx.Value("retry node").(**int64)) == nil {
|
||||||
|
toCall = curr.Inc() % int64(total)
|
||||||
|
*ctx.Value("retry node").(**int64) = &toCall
|
||||||
|
} else {
|
||||||
|
toCall = **ctx.Value("retry node").(**int64)
|
||||||
|
}
|
||||||
|
//toCall := curr.Inc() % int64(total)
|
||||||
|
|
||||||
result := fns[toCall].Call(args)
|
result := fns[toCall].Call(args)
|
||||||
if result[len(result)-1].IsNil() {
|
if result[len(result)-1].IsNil() {
|
||||||
|
@ -25,7 +25,7 @@ var (
|
|||||||
DefaultNetworkTimeout = 100 * time.Second
|
DefaultNetworkTimeout = 100 * time.Second
|
||||||
DefaultCommitRetryDelay = 200 * time.Millisecond
|
DefaultCommitRetryDelay = 200 * time.Millisecond
|
||||||
DefaultBackupsRotate = 6
|
DefaultBackupsRotate = 6
|
||||||
DefaultDatastoreNamespace = "/r" // from "/raft"
|
DefaultDatastoreNamespace = "/r"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ClusterRaftConfig allows to configure the Raft Consensus component for the node cluster.
|
// ClusterRaftConfig allows to configure the Raft Consensus component for the node cluster.
|
||||||
@ -82,7 +82,6 @@ func DefaultClusterRaftConfig() *ClusterRaftConfig {
|
|||||||
|
|
||||||
// Set up logging
|
// Set up logging
|
||||||
cfg.RaftConfig.LogOutput = ioutil.Discard
|
cfg.RaftConfig.LogOutput = ioutil.Discard
|
||||||
//cfg.RaftConfig.Logger = &hcLogToLogger{}
|
|
||||||
return &cfg
|
return &cfg
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,98 +105,11 @@ func NewClusterRaftConfig(userRaftConfig *config.UserRaftConfig) *ClusterRaftCon
|
|||||||
|
|
||||||
// Set up logging
|
// Set up logging
|
||||||
cfg.RaftConfig.LogOutput = ioutil.Discard
|
cfg.RaftConfig.LogOutput = ioutil.Discard
|
||||||
//cfg.RaftConfig.Logger = &hcLogToLogger{}
|
|
||||||
|
|
||||||
//addrInfos, err := addrutil.ParseAddresses(context.Background(), userRaftConfig.InitPeersetMultiAddr)
|
|
||||||
//if err != nil {
|
|
||||||
// return nil
|
|
||||||
//}
|
|
||||||
//for _, addrInfo := range addrInfos {
|
|
||||||
// cfg.InitPeerset = append(cfg.InitPeerset, addrInfo.ID)
|
|
||||||
//}
|
|
||||||
|
|
||||||
return &cfg
|
return &cfg
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConfigJSON represents a human-friendly Config
|
|
||||||
// object which can be saved to JSON. Most configuration keys are converted
|
|
||||||
// into simple types like strings, and key names aim to be self-explanatory
|
|
||||||
// for the user.
|
|
||||||
// Check https://godoc.org/github.com/hashicorp/raft#Config for extended
|
|
||||||
// description on all Raft-specific keys.
|
|
||||||
//type jsonConfig struct {
|
|
||||||
// // Storage folder for snapshots, log store etc. Used by
|
|
||||||
// // the Raft.
|
|
||||||
// DataFolder string `json:"data_folder,omitempty"`
|
|
||||||
//
|
|
||||||
// // InitPeerset provides the list of initial cluster peers for new Raft
|
|
||||||
// // peers (with no prior state). It is ignored when Raft was already
|
|
||||||
// // initialized or when starting in staging mode.
|
|
||||||
// InitPeerset []string `json:"init_peerset"`
|
|
||||||
//
|
|
||||||
// // How long to wait for a leader before failing
|
|
||||||
// WaitForLeaderTimeout string `json:"wait_for_leader_timeout"`
|
|
||||||
//
|
|
||||||
// // How long to wait before timing out network operations
|
|
||||||
// NetworkTimeout string `json:"network_timeout"`
|
|
||||||
//
|
|
||||||
// // How many retries to make upon a failed commit
|
|
||||||
// CommitRetries int `json:"commit_retries"`
|
|
||||||
//
|
|
||||||
// // How long to wait between commit retries
|
|
||||||
// CommitRetryDelay string `json:"commit_retry_delay"`
|
|
||||||
//
|
|
||||||
// // BackupsRotate specifies the maximum number of Raft's DataFolder
|
|
||||||
// // copies that we keep as backups (renaming) after cleanup.
|
|
||||||
// BackupsRotate int `json:"backups_rotate"`
|
|
||||||
//
|
|
||||||
// DatastoreNamespace string `json:"datastore_namespace,omitempty"`
|
|
||||||
//
|
|
||||||
// // HeartbeatTimeout specifies the time in follower state without
|
|
||||||
// // a leader before we attempt an election.
|
|
||||||
// HeartbeatTimeout string `json:"heartbeat_timeout,omitempty"`
|
|
||||||
//
|
|
||||||
// // ElectionTimeout specifies the time in candidate state without
|
|
||||||
// // a leader before we attempt an election.
|
|
||||||
// ElectionTimeout string `json:"election_timeout,omitempty"`
|
|
||||||
//
|
|
||||||
// // CommitTimeout controls the time without an Apply() operation
|
|
||||||
// // before we heartbeat to ensure a timely commit.
|
|
||||||
// CommitTimeout string `json:"commit_timeout,omitempty"`
|
|
||||||
//
|
|
||||||
// // MaxAppendEntries controls the maximum number of append entries
|
|
||||||
// // to send at once.
|
|
||||||
// MaxAppendEntries int `json:"max_append_entries,omitempty"`
|
|
||||||
//
|
|
||||||
// // TrailingLogs controls how many logs we leave after a snapshot.
|
|
||||||
// TrailingLogs uint64 `json:"trailing_logs,omitempty"`
|
|
||||||
//
|
|
||||||
// // SnapshotInterval controls how often we check if we should perform
|
|
||||||
// // a snapshot.
|
|
||||||
// SnapshotInterval string `json:"snapshot_interval,omitempty"`
|
|
||||||
//
|
|
||||||
// // SnapshotThreshold controls how many outstanding logs there must be
|
|
||||||
// // before we perform a snapshot.
|
|
||||||
// SnapshotThreshold uint64 `json:"snapshot_threshold,omitempty"`
|
|
||||||
//
|
|
||||||
// // LeaderLeaseTimeout is used to control how long the "lease" lasts
|
|
||||||
// // for being the leader without being able to contact a quorum
|
|
||||||
// // of nodes. If we reach this interval without contact, we will
|
|
||||||
// // step down as leader.
|
|
||||||
// LeaderLeaseTimeout string `json:"leader_lease_timeout,omitempty"`
|
|
||||||
//
|
|
||||||
// // The unique ID for this server across all time. When running with
|
|
||||||
// // ProtocolVersion < 3, you must set this to be the same as the network
|
|
||||||
// // address of your transport.
|
|
||||||
// // LocalID string `json:local_id`
|
|
||||||
//}
|
|
||||||
|
|
||||||
// ConfigKey returns a human-friendly indentifier for this Config.
|
|
||||||
//func (cfg *config.ClusterRaftConfig) ConfigKey() string {
|
|
||||||
// return configKey
|
|
||||||
//}
|
|
||||||
|
|
||||||
//// Validate checks that this configuration has working values,
|
//// Validate checks that this configuration has working values,
|
||||||
//// at least in appearance.
|
//// at least in appearance.
|
||||||
func ValidateConfig(cfg *ClusterRaftConfig) error {
|
func ValidateConfig(cfg *ClusterRaftConfig) error {
|
||||||
@ -227,139 +139,6 @@ func ValidateConfig(cfg *ClusterRaftConfig) error {
|
|||||||
return hraft.ValidateConfig(cfg.RaftConfig)
|
return hraft.ValidateConfig(cfg.RaftConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadJSON parses a json-encoded configuration (see jsonConfig).
|
|
||||||
// The Config will have default values for all fields not explicited
|
|
||||||
// in the given json object.
|
|
||||||
//func (cfg *Config) LoadJSON(raw []byte) error {
|
|
||||||
// jcfg := &jsonConfig{}
|
|
||||||
// err := json.Unmarshal(raw, jcfg)
|
|
||||||
// if err != nil {
|
|
||||||
// logger.Error("Error unmarshaling raft config")
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// cfg.Default()
|
|
||||||
//
|
|
||||||
// return cfg.applyJSONConfig(jcfg)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
|
|
||||||
// parseDuration := func(txt string) time.Duration {
|
|
||||||
// d, _ := time.ParseDuration(txt)
|
|
||||||
// if txt != "" && d == 0 {
|
|
||||||
// logger.Warnf("%s is not a valid duration. Default will be used", txt)
|
|
||||||
// }
|
|
||||||
// return d
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Parse durations. We ignore errors as 0 will take Default values.
|
|
||||||
// waitForLeaderTimeout := parseDuration(jcfg.WaitForLeaderTimeout)
|
|
||||||
// networkTimeout := parseDuration(jcfg.NetworkTimeout)
|
|
||||||
// commitRetryDelay := parseDuration(jcfg.CommitRetryDelay)
|
|
||||||
// heartbeatTimeout := parseDuration(jcfg.HeartbeatTimeout)
|
|
||||||
// electionTimeout := parseDuration(jcfg.ElectionTimeout)
|
|
||||||
// commitTimeout := parseDuration(jcfg.CommitTimeout)
|
|
||||||
// snapshotInterval := parseDuration(jcfg.SnapshotInterval)
|
|
||||||
// leaderLeaseTimeout := parseDuration(jcfg.LeaderLeaseTimeout)
|
|
||||||
//
|
|
||||||
// // Set all values in config. For some, take defaults if they are 0.
|
|
||||||
// // Set values from jcfg if they are not 0 values
|
|
||||||
//
|
|
||||||
// // Own values
|
|
||||||
// config.SetIfNotDefault(jcfg.DataFolder, &cfg.DataFolder)
|
|
||||||
// config.SetIfNotDefault(waitForLeaderTimeout, &cfg.WaitForLeaderTimeout)
|
|
||||||
// config.SetIfNotDefault(networkTimeout, &cfg.NetworkTimeout)
|
|
||||||
// cfg.CommitRetries = jcfg.CommitRetries
|
|
||||||
// config.SetIfNotDefault(commitRetryDelay, &cfg.CommitRetryDelay)
|
|
||||||
// config.SetIfNotDefault(jcfg.BackupsRotate, &cfg.BackupsRotate)
|
|
||||||
//
|
|
||||||
// // Raft values
|
|
||||||
// config.SetIfNotDefault(heartbeatTimeout, &cfg.RaftConfig.HeartbeatTimeout)
|
|
||||||
// config.SetIfNotDefault(electionTimeout, &cfg.RaftConfig.ElectionTimeout)
|
|
||||||
// config.SetIfNotDefault(commitTimeout, &cfg.RaftConfig.CommitTimeout)
|
|
||||||
// config.SetIfNotDefault(jcfg.MaxAppendEntries, &cfg.RaftConfig.MaxAppendEntries)
|
|
||||||
// config.SetIfNotDefault(jcfg.TrailingLogs, &cfg.RaftConfig.TrailingLogs)
|
|
||||||
// config.SetIfNotDefault(snapshotInterval, &cfg.RaftConfig.SnapshotInterval)
|
|
||||||
// config.SetIfNotDefault(jcfg.SnapshotThreshold, &cfg.RaftConfig.SnapshotThreshold)
|
|
||||||
// config.SetIfNotDefault(leaderLeaseTimeout, &cfg.RaftConfig.LeaderLeaseTimeout)
|
|
||||||
//
|
|
||||||
// cfg.InitPeerset = api.StringsToPeers(jcfg.InitPeerset)
|
|
||||||
// return cfg.Validate()
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//// ToJSON returns the pretty JSON representation of a Config.
|
|
||||||
//func (cfg *Config) ToJSON() ([]byte, error) {
|
|
||||||
// jcfg := cfg.toJSONConfig()
|
|
||||||
//
|
|
||||||
// return config.DefaultJSONMarshal(jcfg)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (cfg *Config) toJSONConfig() *jsonConfig {
|
|
||||||
// jcfg := &jsonConfig{
|
|
||||||
// DataFolder: cfg.DataFolder,
|
|
||||||
// InitPeerset: api.PeersToStrings(cfg.InitPeerset),
|
|
||||||
// WaitForLeaderTimeout: cfg.WaitForLeaderTimeout.String(),
|
|
||||||
// NetworkTimeout: cfg.NetworkTimeout.String(),
|
|
||||||
// CommitRetries: cfg.CommitRetries,
|
|
||||||
// CommitRetryDelay: cfg.CommitRetryDelay.String(),
|
|
||||||
// BackupsRotate: cfg.BackupsRotate,
|
|
||||||
// HeartbeatTimeout: cfg.RaftConfig.HeartbeatTimeout.String(),
|
|
||||||
// ElectionTimeout: cfg.RaftConfig.ElectionTimeout.String(),
|
|
||||||
// CommitTimeout: cfg.RaftConfig.CommitTimeout.String(),
|
|
||||||
// MaxAppendEntries: cfg.RaftConfig.MaxAppendEntries,
|
|
||||||
// TrailingLogs: cfg.RaftConfig.TrailingLogs,
|
|
||||||
// SnapshotInterval: cfg.RaftConfig.SnapshotInterval.String(),
|
|
||||||
// SnapshotThreshold: cfg.RaftConfig.SnapshotThreshold,
|
|
||||||
// LeaderLeaseTimeout: cfg.RaftConfig.LeaderLeaseTimeout.String(),
|
|
||||||
// }
|
|
||||||
// if cfg.DatastoreNamespace != DefaultDatastoreNamespace {
|
|
||||||
// jcfg.DatastoreNamespace = cfg.DatastoreNamespace
|
|
||||||
// // otherwise leave empty so it gets omitted.
|
|
||||||
// }
|
|
||||||
// return jcfg
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
// Default initializes this configuration with working defaults.
|
|
||||||
//func (cfg *config.ClusterRaftConfig) Default() {
|
|
||||||
// cfg.DataFolder = "" // empty so it gets omitted
|
|
||||||
// cfg.InitPeerset = []peer.ID{}
|
|
||||||
// cfg.WaitForLeaderTimeout = DefaultWaitForLeaderTimeout
|
|
||||||
// cfg.NetworkTimeout = DefaultNetworkTimeout
|
|
||||||
// cfg.CommitRetries = DefaultCommitRetries
|
|
||||||
// cfg.CommitRetryDelay = DefaultCommitRetryDelay
|
|
||||||
// cfg.BackupsRotate = DefaultBackupsRotate
|
|
||||||
// cfg.DatastoreNamespace = DefaultDatastoreNamespace
|
|
||||||
// cfg.RaftConfig = hraft.DefaultConfig()
|
|
||||||
//
|
|
||||||
// // These options are imposed over any Default Raft Config.
|
|
||||||
// cfg.RaftConfig.ShutdownOnRemove = false
|
|
||||||
// cfg.RaftConfig.LocalID = "will_be_set_automatically"
|
|
||||||
//
|
|
||||||
// // Set up logging
|
|
||||||
// cfg.RaftConfig.LogOutput = ioutil.Discard
|
|
||||||
// //cfg.RaftConfig.Logger = &hcLogToLogger{}
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func NewDefaultConfig() *config.ClusterRaftConfig {
|
|
||||||
// var cfg config.ClusterRaftConfig
|
|
||||||
// cfg.Default()
|
|
||||||
// return &cfg
|
|
||||||
//}
|
|
||||||
|
|
||||||
//
|
|
||||||
//// ApplyEnvVars fills in any Config fields found
|
|
||||||
//// as environment variables.
|
|
||||||
//func (cfg *Config) ApplyEnvVars() error {
|
|
||||||
// jcfg := cfg.toJSONConfig()
|
|
||||||
//
|
|
||||||
// err := envconfig.Process(envConfigKey, jcfg)
|
|
||||||
// if err != nil {
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return cfg.applyJSONConfig(jcfg)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
// GetDataFolder returns the Raft data folder that we are using.
|
// GetDataFolder returns the Raft data folder that we are using.
|
||||||
func (cfg *ClusterRaftConfig) GetDataFolder(repo repo.LockedRepo) string {
|
func (cfg *ClusterRaftConfig) GetDataFolder(repo repo.LockedRepo) string {
|
||||||
if cfg.DataFolder == "" {
|
if cfg.DataFolder == "" {
|
||||||
@ -367,9 +146,3 @@ func (cfg *ClusterRaftConfig) GetDataFolder(repo repo.LockedRepo) string {
|
|||||||
}
|
}
|
||||||
return filepath.Join(repo.Path(), cfg.DataFolder)
|
return filepath.Join(repo.Path(), cfg.DataFolder)
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
//// ToDisplayJSON returns JSON config as a string.
|
|
||||||
//func (cfg *Config) ToDisplayJSON() ([]byte, error) {
|
|
||||||
// return config.DisplayJSON(cfg.toJSONConfig())
|
|
||||||
//}
|
|
||||||
|
@ -73,7 +73,8 @@ func (c ConsensusOp) ApplyTo(state consensus.State) (consensus.State, error) {
|
|||||||
s := state.(*RaftState)
|
s := state.(*RaftState)
|
||||||
s.NonceMap[c.Addr] = c.Nonce
|
s.NonceMap[c.Addr] = c.Nonce
|
||||||
s.MsgUuids[c.Uuid] = c.SignedMsg
|
s.MsgUuids[c.Uuid] = c.SignedMsg
|
||||||
s.Mpool.Add(context.TODO(), c.SignedMsg)
|
//s.Mpool.Add(context.TODO(), c.SignedMsg)
|
||||||
|
s.Mpool.Push(context.TODO(), c.SignedMsg, false)
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,7 +134,7 @@ func (a *MpoolAPI) MpoolClear(ctx context.Context, local bool) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *MpoolModule) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
|
func (m *MpoolModule) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
|
||||||
return m.Mpool.Push(ctx, smsg)
|
return m.Mpool.Push(ctx, smsg, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
|
func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
|
||||||
@ -237,7 +237,7 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spe
|
|||||||
func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
|
func (a *MpoolAPI) MpoolBatchPush(ctx context.Context, smsgs []*types.SignedMessage) ([]cid.Cid, error) {
|
||||||
var messageCids []cid.Cid
|
var messageCids []cid.Cid
|
||||||
for _, smsg := range smsgs {
|
for _, smsg := range smsgs {
|
||||||
smsgCid, err := a.Mpool.Push(ctx, smsg)
|
smsgCid, err := a.Mpool.Push(ctx, smsg, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return messageCids, err
|
return messageCids, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user