implement MessagePool.CheckReplaceMessages
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
parent
91e774063e
commit
d782250aba
@ -256,6 +256,8 @@ type FullNode interface {
|
|||||||
MpoolCheckMessages(context.Context, []*types.Message) ([][]MessageCheckStatus, error) //perm:read
|
MpoolCheckMessages(context.Context, []*types.Message) ([][]MessageCheckStatus, error) //perm:read
|
||||||
// MpoolCheckPendingMessages performs logical checks for all pending messages from a given address
|
// MpoolCheckPendingMessages performs logical checks for all pending messages from a given address
|
||||||
MpoolCheckPendingMessages(context.Context, address.Address) ([][]MessageCheckStatus, error) //perm:read
|
MpoolCheckPendingMessages(context.Context, address.Address) ([][]MessageCheckStatus, error) //perm:read
|
||||||
|
// MpoolCheckReplaceMessages performs logical checks on pending messages with replacement
|
||||||
|
MpoolCheckReplaceMessages(context.Context, []*types.Message) ([][]MessageCheckStatus, error) //perm:read
|
||||||
|
|
||||||
// MpoolGetNonce gets next nonce for the specified sender.
|
// MpoolGetNonce gets next nonce for the specified sender.
|
||||||
// Note that this method may not be atomic. Use MpoolPushMessage instead.
|
// Note that this method may not be atomic. Use MpoolPushMessage instead.
|
||||||
|
@ -1098,6 +1098,21 @@ func (mr *MockFullNodeMockRecorder) MpoolCheckPendingMessages(arg0, arg1 interfa
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolCheckPendingMessages", reflect.TypeOf((*MockFullNode)(nil).MpoolCheckPendingMessages), arg0, arg1)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolCheckPendingMessages", reflect.TypeOf((*MockFullNode)(nil).MpoolCheckPendingMessages), arg0, arg1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MpoolCheckReplaceMessages mocks base method
|
||||||
|
func (m *MockFullNode) MpoolCheckReplaceMessages(arg0 context.Context, arg1 []*types.Message) ([][]api.MessageCheckStatus, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "MpoolCheckReplaceMessages", arg0, arg1)
|
||||||
|
ret0, _ := ret[0].([][]api.MessageCheckStatus)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// MpoolCheckReplaceMessages indicates an expected call of MpoolCheckReplaceMessages
|
||||||
|
func (mr *MockFullNodeMockRecorder) MpoolCheckReplaceMessages(arg0, arg1 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolCheckReplaceMessages", reflect.TypeOf((*MockFullNode)(nil).MpoolCheckReplaceMessages), arg0, arg1)
|
||||||
|
}
|
||||||
|
|
||||||
// MpoolClear mocks base method
|
// MpoolClear mocks base method
|
||||||
func (m *MockFullNode) MpoolClear(arg0 context.Context, arg1 bool) error {
|
func (m *MockFullNode) MpoolClear(arg0 context.Context, arg1 bool) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
@ -239,6 +239,8 @@ type FullNodeStruct struct {
|
|||||||
|
|
||||||
MpoolCheckPendingMessages func(p0 context.Context, p1 address.Address) ([][]MessageCheckStatus, error) `perm:"read"`
|
MpoolCheckPendingMessages func(p0 context.Context, p1 address.Address) ([][]MessageCheckStatus, error) `perm:"read"`
|
||||||
|
|
||||||
|
MpoolCheckReplaceMessages func(p0 context.Context, p1 []*types.Message) ([][]MessageCheckStatus, error) `perm:"read"`
|
||||||
|
|
||||||
MpoolClear func(p0 context.Context, p1 bool) error `perm:"write"`
|
MpoolClear func(p0 context.Context, p1 bool) error `perm:"write"`
|
||||||
|
|
||||||
MpoolGetConfig func(p0 context.Context) (*types.MpoolConfig, error) `perm:"read"`
|
MpoolGetConfig func(p0 context.Context) (*types.MpoolConfig, error) `perm:"read"`
|
||||||
@ -1529,6 +1531,14 @@ func (s *FullNodeStub) MpoolCheckPendingMessages(p0 context.Context, p1 address.
|
|||||||
return *new([][]MessageCheckStatus), xerrors.New("method not supported")
|
return *new([][]MessageCheckStatus), xerrors.New("method not supported")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *FullNodeStruct) MpoolCheckReplaceMessages(p0 context.Context, p1 []*types.Message) ([][]MessageCheckStatus, error) {
|
||||||
|
return s.Internal.MpoolCheckReplaceMessages(p0, p1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *FullNodeStub) MpoolCheckReplaceMessages(p0 context.Context, p1 []*types.Message) ([][]MessageCheckStatus, error) {
|
||||||
|
return *new([][]MessageCheckStatus), xerrors.New("method not supported")
|
||||||
|
}
|
||||||
|
|
||||||
func (s *FullNodeStruct) MpoolClear(p0 context.Context, p1 bool) error {
|
func (s *FullNodeStruct) MpoolClear(p0 context.Context, p1 bool) error {
|
||||||
return s.Internal.MpoolClear(p0, p1)
|
return s.Internal.MpoolClear(p0, p1)
|
||||||
}
|
}
|
||||||
|
@ -46,6 +46,51 @@ func (mp *MessagePool) CheckPendingMessages(from address.Address) ([][]api.Messa
|
|||||||
return mp.checkMessages(msgs, true)
|
return mp.checkMessages(msgs, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckReplaceMessages performs a set of logical checks for related messages while performing a
|
||||||
|
// replacement.
|
||||||
|
func (mp *MessagePool) CheckReplaceMessages(replace []*types.Message) ([][]api.MessageCheckStatus, error) {
|
||||||
|
msgMap := make(map[address.Address]map[uint64]*types.Message)
|
||||||
|
count := 0
|
||||||
|
|
||||||
|
mp.lk.Lock()
|
||||||
|
for _, m := range replace {
|
||||||
|
mmap, ok := msgMap[m.From]
|
||||||
|
if !ok {
|
||||||
|
mmap = make(map[uint64]*types.Message)
|
||||||
|
msgMap[m.From] = mmap
|
||||||
|
mset, ok := mp.pending[m.From]
|
||||||
|
if ok {
|
||||||
|
count += len(mset.msgs)
|
||||||
|
for _, sm := range mset.msgs {
|
||||||
|
mmap[sm.Message.Nonce] = &sm.Message
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mmap[m.Nonce] = m
|
||||||
|
}
|
||||||
|
mp.lk.Unlock()
|
||||||
|
|
||||||
|
msgs := make([]*types.Message, 0, count)
|
||||||
|
start := 0
|
||||||
|
for _, mmap := range msgMap {
|
||||||
|
end := start + len(mmap)
|
||||||
|
|
||||||
|
for _, m := range mmap {
|
||||||
|
msgs = append(msgs, m)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(msgs[start:end], func(i, j int) bool {
|
||||||
|
return msgs[start+i].Nonce < msgs[start+j].Nonce
|
||||||
|
})
|
||||||
|
|
||||||
|
start = end
|
||||||
|
}
|
||||||
|
|
||||||
|
return mp.checkMessages(msgs, true)
|
||||||
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) checkMessages(msgs []*types.Message, interned bool) (result [][]api.MessageCheckStatus, err error) {
|
func (mp *MessagePool) checkMessages(msgs []*types.Message, interned bool) (result [][]api.MessageCheckStatus, err error) {
|
||||||
mp.curTsLk.Lock()
|
mp.curTsLk.Lock()
|
||||||
curTs := mp.curTs
|
curTs := mp.curTs
|
||||||
|
@ -84,6 +84,7 @@
|
|||||||
* [MpoolBatchPushUntrusted](#MpoolBatchPushUntrusted)
|
* [MpoolBatchPushUntrusted](#MpoolBatchPushUntrusted)
|
||||||
* [MpoolCheckMessages](#MpoolCheckMessages)
|
* [MpoolCheckMessages](#MpoolCheckMessages)
|
||||||
* [MpoolCheckPendingMessages](#MpoolCheckPendingMessages)
|
* [MpoolCheckPendingMessages](#MpoolCheckPendingMessages)
|
||||||
|
* [MpoolCheckReplaceMessages](#MpoolCheckReplaceMessages)
|
||||||
* [MpoolClear](#MpoolClear)
|
* [MpoolClear](#MpoolClear)
|
||||||
* [MpoolGetConfig](#MpoolGetConfig)
|
* [MpoolGetConfig](#MpoolGetConfig)
|
||||||
* [MpoolGetNonce](#MpoolGetNonce)
|
* [MpoolGetNonce](#MpoolGetNonce)
|
||||||
@ -2026,6 +2027,21 @@ Inputs:
|
|||||||
|
|
||||||
Response: `null`
|
Response: `null`
|
||||||
|
|
||||||
|
### MpoolCheckReplaceMessages
|
||||||
|
MpoolCheckMessages performs logical checks on pending messages with replacement
|
||||||
|
|
||||||
|
|
||||||
|
Perms: read
|
||||||
|
|
||||||
|
Inputs:
|
||||||
|
```json
|
||||||
|
[
|
||||||
|
null
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
Response: `null`
|
||||||
|
|
||||||
### MpoolClear
|
### MpoolClear
|
||||||
MpoolClear clears pending messages from the mpool
|
MpoolClear clears pending messages from the mpool
|
||||||
|
|
||||||
|
@ -233,6 +233,10 @@ func (a *MpoolAPI) MpoolCheckPendingMessages(ctx context.Context, from address.A
|
|||||||
return a.Mpool.CheckPendingMessages(from)
|
return a.Mpool.CheckPendingMessages(from)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *MpoolAPI) MpoolCheckReplaceMessages(ctx context.Context, msgs []*types.Message) ([][]api.MessageCheckStatus, error) {
|
||||||
|
return a.Mpool.CheckReplaceMessages(msgs)
|
||||||
|
}
|
||||||
|
|
||||||
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
|
||||||
return a.Mpool.GetNonce(ctx, addr, types.EmptyTSK)
|
return a.Mpool.GetNonce(ctx, addr, types.EmptyTSK)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user