implement MessagePool.CheckMessages

Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
vyzo 2021-03-12 17:10:12 +02:00 committed by Jakub Sztandera
parent ed61642b3a
commit 91e774063e
No known key found for this signature in database
GPG Key ID: 9A9AF56F8B3879BA
11 changed files with 541 additions and 0 deletions

View File

@ -252,6 +252,11 @@ type FullNode interface {
// MpoolBatchPushMessage batch pushes a unsigned message to mempool.
MpoolBatchPushMessage(context.Context, []*types.Message, *MessageSendSpec) ([]*types.SignedMessage, error) //perm:sign
// MpoolCheckMessages performs logical checks on a batch of messages
MpoolCheckMessages(context.Context, []*types.Message) ([][]MessageCheckStatus, error) //perm:read
// MpoolCheckPendingMessages performs logical checks for all pending messages from a given address
MpoolCheckPendingMessages(context.Context, address.Address) ([][]MessageCheckStatus, error) //perm:read
// MpoolGetNonce gets next nonce for the specified sender.
// Note that this method may not be atomic. Use MpoolPushMessage instead.
MpoolGetNonce(context.Context, address.Address) (uint64, error) //perm:read

View File

@ -0,0 +1,35 @@
// Code generated by "stringer -type=CheckStatusCode -trimprefix=CheckStatus"; DO NOT EDIT.
package api
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[CheckStatusMessageSerialize-1]
_ = x[CheckStatusMessageSize-2]
_ = x[CheckStatusMessageValidity-3]
_ = x[CheckStatusMessageMinGas-4]
_ = x[CheckStatusMessageMinBaseFee-5]
_ = x[CheckStatusMessageBaseFee-6]
_ = x[CheckStatusMessageBaseFeeLowerBound-7]
_ = x[CheckStatusMessageBaseFeeUpperBound-8]
_ = x[CheckStatusMessageGetStateNonce-9]
_ = x[CheckStatusMessageNonce-10]
_ = x[CheckStatusMessageGetStateBalance-11]
_ = x[CheckStatusMessageBalance-12]
}
const _CheckStatusCode_name = "MessageSerializeMessageSizeMessageValidityMessageMinGasMessageMinBaseFeeMessageBaseFeeMessageBaseFeeLowerBoundMessageBaseFeeUpperBoundMessageGetStateNonceMessageNonceMessageGetStateBalanceMessageBalance"
var _CheckStatusCode_index = [...]uint8{0, 16, 27, 42, 55, 72, 86, 110, 134, 154, 166, 188, 202}
func (i CheckStatusCode) String() string {
i -= 1
if i < 0 || i >= CheckStatusCode(len(_CheckStatusCode_index)-1) {
return "CheckStatusCode(" + strconv.FormatInt(int64(i+1), 10) + ")"
}
return _CheckStatusCode_name[_CheckStatusCode_index[i]:_CheckStatusCode_index[i+1]]
}

View File

@ -261,6 +261,9 @@ func init() {
},
"methods": []interface{}{}},
)
addExample(api.CheckStatusCode(0))
addExample(map[string]interface{}{"abc": 123})
}
func GetAPIType(name, pkg string) (i interface{}, t, permStruct, commonPermStruct reflect.Type) {

View File

@ -1068,6 +1068,36 @@ func (mr *MockFullNodeMockRecorder) MpoolBatchPushUntrusted(arg0, arg1 interface
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolBatchPushUntrusted", reflect.TypeOf((*MockFullNode)(nil).MpoolBatchPushUntrusted), arg0, arg1)
}
// MpoolCheckMessages mocks base method
func (m *MockFullNode) MpoolCheckMessages(arg0 context.Context, arg1 []*types.Message) ([][]api.MessageCheckStatus, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "MpoolCheckMessages", arg0, arg1)
ret0, _ := ret[0].([][]api.MessageCheckStatus)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// MpoolCheckMessages indicates an expected call of MpoolCheckMessages
func (mr *MockFullNodeMockRecorder) MpoolCheckMessages(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolCheckMessages", reflect.TypeOf((*MockFullNode)(nil).MpoolCheckMessages), arg0, arg1)
}
// MpoolCheckPendingMessages mocks base method
func (m *MockFullNode) MpoolCheckPendingMessages(arg0 context.Context, arg1 address.Address) ([][]api.MessageCheckStatus, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "MpoolCheckPendingMessages", arg0, arg1)
ret0, _ := ret[0].([][]api.MessageCheckStatus)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// MpoolCheckPendingMessages indicates an expected call of MpoolCheckPendingMessages
func (mr *MockFullNodeMockRecorder) MpoolCheckPendingMessages(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolCheckPendingMessages", reflect.TypeOf((*MockFullNode)(nil).MpoolCheckPendingMessages), arg0, arg1)
}
// MpoolClear mocks base method
func (m *MockFullNode) MpoolClear(arg0 context.Context, arg1 bool) error {
m.ctrl.T.Helper()

View File

@ -235,6 +235,10 @@ type FullNodeStruct struct {
MpoolBatchPushUntrusted func(p0 context.Context, p1 []*types.SignedMessage) ([]cid.Cid, error) `perm:"write"`
MpoolCheckMessages func(p0 context.Context, p1 []*types.Message) ([][]MessageCheckStatus, error) `perm:"read"`
MpoolCheckPendingMessages func(p0 context.Context, p1 address.Address) ([][]MessageCheckStatus, error) `perm:"read"`
MpoolClear func(p0 context.Context, p1 bool) error `perm:"write"`
MpoolGetConfig func(p0 context.Context) (*types.MpoolConfig, error) `perm:"read"`
@ -1509,6 +1513,22 @@ func (s *FullNodeStub) MpoolBatchPushUntrusted(p0 context.Context, p1 []*types.S
return *new([]cid.Cid), xerrors.New("method not supported")
}
func (s *FullNodeStruct) MpoolCheckMessages(p0 context.Context, p1 []*types.Message) ([][]MessageCheckStatus, error) {
return s.Internal.MpoolCheckMessages(p0, p1)
}
func (s *FullNodeStub) MpoolCheckMessages(p0 context.Context, p1 []*types.Message) ([][]MessageCheckStatus, error) {
return *new([][]MessageCheckStatus), xerrors.New("method not supported")
}
func (s *FullNodeStruct) MpoolCheckPendingMessages(p0 context.Context, p1 address.Address) ([][]MessageCheckStatus, error) {
return s.Internal.MpoolCheckPendingMessages(p0, p1)
}
func (s *FullNodeStub) MpoolCheckPendingMessages(p0 context.Context, p1 address.Address) ([][]MessageCheckStatus, error) {
return *new([][]MessageCheckStatus), xerrors.New("method not supported")
}
func (s *FullNodeStruct) MpoolClear(p0 context.Context, p1 bool) error {
return s.Internal.MpoolClear(p0, p1)
}

View File

@ -137,3 +137,35 @@ type NodeChainStatus struct {
BlocksPerTipsetLast100 float64
BlocksPerTipsetLastFinality float64
}
type CheckStatusCode int
//go:generate go run golang.org/x/tools/cmd/stringer -type=CheckStatusCode -trimprefix=CheckStatus
const (
_ CheckStatusCode = iota
// Message Checks
CheckStatusMessageSerialize
CheckStatusMessageSize
CheckStatusMessageValidity
CheckStatusMessageMinGas
CheckStatusMessageMinBaseFee
CheckStatusMessageBaseFee
CheckStatusMessageBaseFeeLowerBound
CheckStatusMessageBaseFeeUpperBound
CheckStatusMessageGetStateNonce
CheckStatusMessageNonce
CheckStatusMessageGetStateBalance
CheckStatusMessageBalance
)
type CheckStatus struct {
Code CheckStatusCode
OK bool
Err string
Hint map[string]interface{}
}
type MessageCheckStatus struct {
Cid cid.Cid
CheckStatus
}

View File

@ -6,4 +6,5 @@ import (
_ "github.com/GeertJohan/go.rice/rice"
_ "github.com/golang/mock/mockgen"
_ "github.com/whyrusleeping/bencher"
_ "golang.org/x/tools/cmd/stringer"
)

374
chain/messagepool/check.go Normal file
View File

@ -0,0 +1,374 @@
package messagepool
import (
"context"
"fmt"
stdbig "math/big"
"sort"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
)
var baseFeeUpperBoundFactor = types.NewInt(10)
// CheckMessages performs a set of logic checks for a list of messages, prior to submitting it to the mpool
func (mp *MessagePool) CheckMessages(msgs []*types.Message) ([][]api.MessageCheckStatus, error) {
return mp.checkMessages(msgs, false)
}
// CheckPendingMessages performs a set of logical sets for all messages pending from a given actor
func (mp *MessagePool) CheckPendingMessages(from address.Address) ([][]api.MessageCheckStatus, error) {
var msgs []*types.Message
mp.lk.Lock()
mset, ok := mp.pending[from]
if ok {
for _, sm := range mset.msgs {
msgs = append(msgs, &sm.Message)
}
}
mp.lk.Unlock()
if len(msgs) == 0 {
return nil, nil
}
sort.Slice(msgs, func(i, j int) bool {
return msgs[i].Nonce < msgs[j].Nonce
})
return mp.checkMessages(msgs, true)
}
func (mp *MessagePool) checkMessages(msgs []*types.Message, interned bool) (result [][]api.MessageCheckStatus, err error) {
mp.curTsLk.Lock()
curTs := mp.curTs
mp.curTsLk.Unlock()
epoch := curTs.Height()
var baseFee big.Int
if len(curTs.Blocks()) > 0 {
baseFee = curTs.Blocks()[0].ParentBaseFee
} else {
baseFee, err = mp.api.ChainComputeBaseFee(context.Background(), curTs)
if err != nil {
return nil, xerrors.Errorf("error computing basefee: %w", err)
}
}
baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor)
baseFeeUpperBound := types.BigMul(baseFee, baseFeeUpperBoundFactor)
type actorState struct {
nextNonce uint64
requiredFunds *stdbig.Int
}
state := make(map[address.Address]*actorState)
balances := make(map[address.Address]big.Int)
result = make([][]api.MessageCheckStatus, len(msgs))
for i, m := range msgs {
// pre-check: actor nonce
check := api.MessageCheckStatus{
Cid: m.Cid(),
CheckStatus: api.CheckStatus{
Code: api.CheckStatusMessageGetStateNonce,
},
}
st, ok := state[m.From]
if !ok {
mp.lk.Lock()
mset, ok := mp.pending[m.From]
if ok && !interned {
st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds}
for _, m := range mset.msgs {
st.requiredFunds = new(stdbig.Int).Add(st.requiredFunds, m.Message.Value.Int)
}
state[m.From] = st
mp.lk.Unlock()
check.OK = true
check.Hint = map[string]interface{}{
"nonce": st.nextNonce,
}
} else {
mp.lk.Unlock()
stateNonce, err := mp.getStateNonce(m.From, curTs)
if err != nil {
check.OK = false
check.Err = fmt.Sprintf("error retrieving state nonce: %s", err.Error())
} else {
check.OK = true
check.Hint = map[string]interface{}{
"nonce": stateNonce,
}
}
st = &actorState{nextNonce: stateNonce, requiredFunds: new(stdbig.Int)}
state[m.From] = st
}
}
result[i] = append(result[i], check)
if !check.OK {
continue
}
// pre-check: actor balance
check = api.MessageCheckStatus{
Cid: m.Cid(),
CheckStatus: api.CheckStatus{
Code: api.CheckStatusMessageGetStateBalance,
},
}
balance, ok := balances[m.From]
if !ok {
balance, err = mp.getStateBalance(m.From, curTs)
if err != nil {
check.OK = false
check.Err = fmt.Sprintf("error retrieving state balance: %s", err)
} else {
check.OK = true
check.Hint = map[string]interface{}{
"balance": balance,
}
}
balances[m.From] = balance
} else {
check.OK = true
check.Hint = map[string]interface{}{
"balance": balance,
}
}
result[i] = append(result[i], check)
if !check.OK {
continue
}
// 1. Serialization
check = api.MessageCheckStatus{
Cid: m.Cid(),
CheckStatus: api.CheckStatus{
Code: api.CheckStatusMessageSerialize,
},
}
bytes, err := m.Serialize()
if err != nil {
check.OK = false
check.Err = err.Error()
} else {
check.OK = true
}
result[i] = append(result[i], check)
// 2. Message size
check = api.MessageCheckStatus{
Cid: m.Cid(),
CheckStatus: api.CheckStatus{
Code: api.CheckStatusMessageSize,
},
}
if len(bytes) > 32*1024-128 { // 128 bytes to account for signature size
check.OK = false
check.Err = "message too big"
} else {
check.OK = true
}
result[i] = append(result[i], check)
// 3. Syntactic validation
check = api.MessageCheckStatus{
Cid: m.Cid(),
CheckStatus: api.CheckStatus{
Code: api.CheckStatusMessageValidity,
},
}
if err := m.ValidForBlockInclusion(0, build.NewestNetworkVersion); err != nil {
check.OK = false
check.Err = fmt.Sprintf("syntactically invalid message: %s", err.Error())
} else {
check.OK = true
}
result[i] = append(result[i], check)
if !check.OK {
// skip remaining checks if it is a syntatically invalid message
continue
}
// gas checks
// 4. Min Gas
minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength())
check = api.MessageCheckStatus{
Cid: m.Cid(),
CheckStatus: api.CheckStatus{
Code: api.CheckStatusMessageMinGas,
Hint: map[string]interface{}{
"minGas": minGas,
},
},
}
if m.GasLimit < minGas.Total() {
check.OK = false
check.Err = "GasLimit less than epoch minimum gas"
} else {
check.OK = true
}
result[i] = append(result[i], check)
// 5. Min Base Fee
check = api.MessageCheckStatus{
Cid: m.Cid(),
CheckStatus: api.CheckStatus{
Code: api.CheckStatusMessageMinBaseFee,
},
}
if m.GasFeeCap.LessThan(minimumBaseFee) {
check.OK = false
check.Err = "GasFeeCap less than minimum base fee"
} else {
check.OK = true
}
result[i] = append(result[i], check)
if !check.OK {
goto checkState
}
// 6. Base Fee
check = api.MessageCheckStatus{
Cid: m.Cid(),
CheckStatus: api.CheckStatus{
Code: api.CheckStatusMessageBaseFee,
Hint: map[string]interface{}{
"baseFee": baseFee,
},
},
}
if m.GasFeeCap.LessThan(baseFee) {
check.OK = false
check.Err = "GasFeeCap less than current base fee"
} else {
check.OK = true
}
result[i] = append(result[i], check)
// 7. Base Fee lower bound
check = api.MessageCheckStatus{
Cid: m.Cid(),
CheckStatus: api.CheckStatus{
Code: api.CheckStatusMessageBaseFeeLowerBound,
Hint: map[string]interface{}{
"baseFeeLowerBound": baseFeeLowerBound,
},
},
}
if m.GasFeeCap.LessThan(baseFeeLowerBound) {
check.OK = false
check.Err = "GasFeeCap less than base fee lower bound for inclusion in next 20 epochs"
} else {
check.OK = true
}
result[i] = append(result[i], check)
if !check.OK {
goto checkState
}
// 8. Base Fee upper bound
check = api.MessageCheckStatus{
Cid: m.Cid(),
CheckStatus: api.CheckStatus{
Code: api.CheckStatusMessageBaseFeeUpperBound,
Hint: map[string]interface{}{
"baseFeeUpperBound": baseFeeUpperBound,
},
},
}
if m.GasFeeCap.LessThan(baseFeeUpperBound) {
check.OK = false
check.Err = "GasFeeCap less than base fee upper bound for inclusion in next 20 epochs"
} else {
check.OK = true
}
result[i] = append(result[i], check)
// stateful checks
checkState:
// 9. Message Nonce
check = api.MessageCheckStatus{
Cid: m.Cid(),
CheckStatus: api.CheckStatus{
Code: api.CheckStatusMessageNonce,
Hint: map[string]interface{}{
"nextNonce": st.nextNonce,
},
},
}
if st.nextNonce != m.Nonce {
check.OK = false
check.Err = fmt.Sprintf("message nonce doesn't match next nonce (%d)", st.nextNonce)
} else {
check.OK = true
st.nextNonce++
}
result[i] = append(result[i], check)
// check required funds -vs- balance
st.requiredFunds = new(stdbig.Int).Add(st.requiredFunds, m.RequiredFunds().Int)
st.requiredFunds.Add(st.requiredFunds, m.Value.Int)
// 10. Balance
check = api.MessageCheckStatus{
Cid: m.Cid(),
CheckStatus: api.CheckStatus{
Code: api.CheckStatusMessageBalance,
Hint: map[string]interface{}{
"requiredFunds": big.Int{Int: stdbig.NewInt(0).Set(st.requiredFunds)},
},
},
}
if balance.Int.Cmp(st.requiredFunds) < 0 {
check.OK = false
check.Err = "insufficient balance"
} else {
check.OK = true
}
result[i] = append(result[i], check)
}
return result, nil
}

View File

@ -82,6 +82,8 @@
* [MpoolBatchPush](#MpoolBatchPush)
* [MpoolBatchPushMessage](#MpoolBatchPushMessage)
* [MpoolBatchPushUntrusted](#MpoolBatchPushUntrusted)
* [MpoolCheckMessages](#MpoolCheckMessages)
* [MpoolCheckPendingMessages](#MpoolCheckPendingMessages)
* [MpoolClear](#MpoolClear)
* [MpoolGetConfig](#MpoolGetConfig)
* [MpoolGetNonce](#MpoolGetNonce)
@ -1994,6 +1996,36 @@ Inputs:
Response: `null`
### MpoolCheckMessages
MpoolCheckMessages performs logical checks on a batch of messages
Perms: read
Inputs:
```json
[
null
]
```
Response: `null`
### MpoolCheckPendingMessages
MpoolCheckPendingMessages performs logical checks for all pending messages from a given address
Perms: read
Inputs:
```json
[
"f01234"
]
```
Response: `null`
### MpoolClear
MpoolClear clears pending messages from the mpool

1
go.mod
View File

@ -150,6 +150,7 @@ require (
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20201112185108-eeaa07dd7696
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/cheggaaa/pb.v1 v1.0.28

View File

@ -225,6 +225,14 @@ func (a *MpoolAPI) MpoolBatchPushMessage(ctx context.Context, msgs []*types.Mess
return smsgs, nil
}
func (a *MpoolAPI) MpoolCheckMessages(ctx context.Context, msgs []*types.Message) ([][]api.MessageCheckStatus, error) {
return a.Mpool.CheckMessages(msgs)
}
func (a *MpoolAPI) MpoolCheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
return a.Mpool.CheckPendingMessages(from)
}
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
return a.Mpool.GetNonce(ctx, addr, types.EmptyTSK)
}