Merge branch 'master' into feat/fvm
This commit is contained in:
commit
6e1d5c5733
@ -850,6 +850,11 @@ workflows:
|
||||
suite: itest-get_messages_in_ts
|
||||
target: "./itests/get_messages_in_ts_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-mempool
|
||||
suite: itest-mempool
|
||||
target: "./itests/mempool_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-multisig
|
||||
suite: itest-multisig
|
||||
|
2
Makefile
2
Makefile
@ -345,6 +345,8 @@ gen: actors-gen type-gen method-gen cfgdoc-gen docsgen api-gen circleci
|
||||
@echo ">>> IF YOU'VE MODIFIED THE CLI OR CONFIG, REMEMBER TO ALSO MAKE docsgen-cli"
|
||||
.PHONY: gen
|
||||
|
||||
jen: gen
|
||||
|
||||
snap: lotus lotus-miner lotus-worker
|
||||
snapcraft
|
||||
# snapcraft upload ./lotus_*.snap
|
||||
|
21
blockstore/context.go
Normal file
21
blockstore/context.go
Normal file
@ -0,0 +1,21 @@
|
||||
package blockstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type hotViewKey struct{}
|
||||
|
||||
var hotView = hotViewKey{}
|
||||
|
||||
// WithHotView constructs a new context with an option that provides a hint to the blockstore
|
||||
// (e.g. the splitstore) that the object (and its ipld references) should be kept hot.
|
||||
func WithHotView(ctx context.Context) context.Context {
|
||||
return context.WithValue(ctx, hotView, struct{}{})
|
||||
}
|
||||
|
||||
// IsHotView returns true if the hot view option is set in the context
|
||||
func IsHotView(ctx context.Context) bool {
|
||||
v := ctx.Value(hotView)
|
||||
return v != nil
|
||||
}
|
@ -161,6 +161,13 @@ type SplitStore struct {
|
||||
txnSyncCond sync.Cond
|
||||
txnSync bool
|
||||
|
||||
// background cold object reification
|
||||
reifyWorkers sync.WaitGroup
|
||||
reifyMx sync.Mutex
|
||||
reifyCond sync.Cond
|
||||
reifyPend map[cid.Cid]struct{}
|
||||
reifyInProgress map[cid.Cid]struct{}
|
||||
|
||||
// registered protectors
|
||||
protectors []func(func(cid.Cid) error) error
|
||||
}
|
||||
@ -202,6 +209,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
|
||||
ss.txnSyncCond.L = &ss.txnSyncMx
|
||||
ss.ctx, ss.cancel = context.WithCancel(context.Background())
|
||||
|
||||
ss.reifyCond.L = &ss.reifyMx
|
||||
ss.reifyPend = make(map[cid.Cid]struct{})
|
||||
ss.reifyInProgress = make(map[cid.Cid]struct{})
|
||||
|
||||
if enableDebugLog {
|
||||
ss.debug, err = openDebugLog(path)
|
||||
if err != nil {
|
||||
@ -264,7 +275,13 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return s.cold.Has(ctx, cid)
|
||||
has, err = s.cold.Has(ctx, cid)
|
||||
if has && bstore.IsHotView(ctx) {
|
||||
s.reifyColdObject(cid)
|
||||
}
|
||||
|
||||
return has, err
|
||||
|
||||
}
|
||||
|
||||
func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
|
||||
@ -308,8 +325,11 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)
|
||||
|
||||
blk, err = s.cold.Get(ctx, cid)
|
||||
if err == nil {
|
||||
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
|
||||
if bstore.IsHotView(ctx) {
|
||||
s.reifyColdObject(cid)
|
||||
}
|
||||
|
||||
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
|
||||
}
|
||||
return blk, err
|
||||
|
||||
@ -359,6 +379,10 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
|
||||
|
||||
size, err = s.cold.GetSize(ctx, cid)
|
||||
if err == nil {
|
||||
if bstore.IsHotView(ctx) {
|
||||
s.reifyColdObject(cid)
|
||||
}
|
||||
|
||||
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
|
||||
}
|
||||
return size, err
|
||||
@ -536,6 +560,10 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro
|
||||
|
||||
err = s.cold.View(ctx, cid, cb)
|
||||
if err == nil {
|
||||
if bstore.IsHotView(ctx) {
|
||||
s.reifyColdObject(cid)
|
||||
}
|
||||
|
||||
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
|
||||
}
|
||||
return err
|
||||
@ -645,6 +673,9 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error
|
||||
}
|
||||
}
|
||||
|
||||
// spawn the reifier
|
||||
go s.reifyOrchestrator()
|
||||
|
||||
// watch the chain
|
||||
chain.SubscribeHeadChanges(s.HeadChange)
|
||||
|
||||
@ -676,6 +707,8 @@ func (s *SplitStore) Close() error {
|
||||
}
|
||||
}
|
||||
|
||||
s.reifyCond.Broadcast()
|
||||
s.reifyWorkers.Wait()
|
||||
s.cancel()
|
||||
return multierr.Combine(s.markSetEnv.Close(), s.debug.Close())
|
||||
}
|
||||
|
203
blockstore/splitstore/splitstore_reify.go
Normal file
203
blockstore/splitstore/splitstore_reify.go
Normal file
@ -0,0 +1,203 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
var EnableReification = false
|
||||
|
||||
func (s *SplitStore) reifyColdObject(c cid.Cid) {
|
||||
if !EnableReification {
|
||||
return
|
||||
}
|
||||
|
||||
if !s.isWarm() {
|
||||
return
|
||||
}
|
||||
|
||||
if isUnitaryObject(c) {
|
||||
return
|
||||
}
|
||||
|
||||
s.reifyMx.Lock()
|
||||
defer s.reifyMx.Unlock()
|
||||
|
||||
_, ok := s.reifyInProgress[c]
|
||||
if ok {
|
||||
return
|
||||
}
|
||||
|
||||
s.reifyPend[c] = struct{}{}
|
||||
s.reifyCond.Broadcast()
|
||||
}
|
||||
|
||||
func (s *SplitStore) reifyOrchestrator() {
|
||||
workers := runtime.NumCPU() / 4
|
||||
if workers < 2 {
|
||||
workers = 2
|
||||
}
|
||||
|
||||
workch := make(chan cid.Cid, workers)
|
||||
defer close(workch)
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
s.reifyWorkers.Add(1)
|
||||
go s.reifyWorker(workch)
|
||||
}
|
||||
|
||||
for {
|
||||
s.reifyMx.Lock()
|
||||
for len(s.reifyPend) == 0 && atomic.LoadInt32(&s.closing) == 0 {
|
||||
s.reifyCond.Wait()
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(&s.closing) != 0 {
|
||||
s.reifyMx.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
reifyPend := s.reifyPend
|
||||
s.reifyPend = make(map[cid.Cid]struct{})
|
||||
s.reifyMx.Unlock()
|
||||
|
||||
for c := range reifyPend {
|
||||
select {
|
||||
case workch <- c:
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SplitStore) reifyWorker(workch chan cid.Cid) {
|
||||
defer s.reifyWorkers.Done()
|
||||
for c := range workch {
|
||||
s.doReify(c)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SplitStore) doReify(c cid.Cid) {
|
||||
var toreify, totrack, toforget []cid.Cid
|
||||
|
||||
defer func() {
|
||||
s.reifyMx.Lock()
|
||||
defer s.reifyMx.Unlock()
|
||||
|
||||
for _, c := range toreify {
|
||||
delete(s.reifyInProgress, c)
|
||||
}
|
||||
for _, c := range totrack {
|
||||
delete(s.reifyInProgress, c)
|
||||
}
|
||||
for _, c := range toforget {
|
||||
delete(s.reifyInProgress, c)
|
||||
}
|
||||
}()
|
||||
|
||||
s.txnLk.RLock()
|
||||
defer s.txnLk.RUnlock()
|
||||
|
||||
err := s.walkObjectIncomplete(c, newTmpVisitor(),
|
||||
func(c cid.Cid) error {
|
||||
if isUnitaryObject(c) {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
s.reifyMx.Lock()
|
||||
_, inProgress := s.reifyInProgress[c]
|
||||
if !inProgress {
|
||||
s.reifyInProgress[c] = struct{}{}
|
||||
}
|
||||
s.reifyMx.Unlock()
|
||||
|
||||
if inProgress {
|
||||
return errStopWalk
|
||||
}
|
||||
|
||||
has, err := s.hot.Has(s.ctx, c)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error checking hotstore: %w", err)
|
||||
}
|
||||
|
||||
if has {
|
||||
if s.txnMarkSet != nil {
|
||||
hasMark, err := s.txnMarkSet.Has(c)
|
||||
if err != nil {
|
||||
log.Warnf("error checking markset: %s", err)
|
||||
} else if hasMark {
|
||||
toforget = append(toforget, c)
|
||||
return errStopWalk
|
||||
}
|
||||
} else {
|
||||
totrack = append(totrack, c)
|
||||
return errStopWalk
|
||||
}
|
||||
}
|
||||
|
||||
toreify = append(toreify, c)
|
||||
return nil
|
||||
},
|
||||
func(missing cid.Cid) error {
|
||||
log.Warnf("missing reference while reifying %s: %s", c, missing)
|
||||
return errStopWalk
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Warnf("error walking cold object for reification (cid: %s): %s", c, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("reifying %d objects rooted at %s", len(toreify), c)
|
||||
|
||||
// this should not get too big, maybe some 100s of objects.
|
||||
batch := make([]blocks.Block, 0, len(toreify))
|
||||
for _, c := range toreify {
|
||||
blk, err := s.cold.Get(s.ctx, c)
|
||||
if err != nil {
|
||||
log.Warnf("error retrieving cold object for reification (cid: %s): %s", c, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.checkClosing(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
batch = append(batch, blk)
|
||||
}
|
||||
|
||||
if len(batch) > 0 {
|
||||
err = s.hot.PutMany(s.ctx, batch)
|
||||
if err != nil {
|
||||
log.Warnf("error reifying cold object (cid: %s): %s", c, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if s.txnMarkSet != nil {
|
||||
if len(toreify) > 0 {
|
||||
if err := s.txnMarkSet.MarkMany(toreify); err != nil {
|
||||
log.Warnf("error marking reified objects: %s", err)
|
||||
}
|
||||
}
|
||||
if len(totrack) > 0 {
|
||||
if err := s.txnMarkSet.MarkMany(totrack); err != nil {
|
||||
log.Warnf("error marking tracked objects: %s", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// if txnActive is false these are noops
|
||||
if len(toreify) > 0 {
|
||||
s.trackTxnRefMany(toreify)
|
||||
}
|
||||
if len(totrack) > 0 {
|
||||
s.trackTxnRefMany(totrack)
|
||||
}
|
||||
}
|
||||
}
|
@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -387,6 +388,136 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) {
|
||||
ds := dssync.MutexWrap(datastore.NewMapDatastore())
|
||||
hot := newMockStore()
|
||||
cold := newMockStore()
|
||||
|
||||
mkRandomBlock := func() blocks.Block {
|
||||
data := make([]byte, 128)
|
||||
_, err := rand.Read(data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return blocks.NewBlock(data)
|
||||
}
|
||||
|
||||
block1 := mkRandomBlock()
|
||||
block2 := mkRandomBlock()
|
||||
block3 := mkRandomBlock()
|
||||
|
||||
hdr := mock.MkBlock(nil, 0, 0)
|
||||
hdr.Messages = block1.Cid()
|
||||
hdr.ParentMessageReceipts = block2.Cid()
|
||||
hdr.ParentStateRoot = block3.Cid()
|
||||
block4, err := hdr.ToStorageBlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
allBlocks := []blocks.Block{block1, block2, block3, block4}
|
||||
for _, blk := range allBlocks {
|
||||
err := cold.Put(context.Background(), blk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
path, err := ioutil.TempDir("", "splitstore.*")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
t.Cleanup(func() {
|
||||
_ = os.RemoveAll(path)
|
||||
})
|
||||
|
||||
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer ss.Close() //nolint
|
||||
|
||||
ss.warmupEpoch = 1
|
||||
go ss.reifyOrchestrator()
|
||||
|
||||
waitForReification := func() {
|
||||
for {
|
||||
ss.reifyMx.Lock()
|
||||
ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0
|
||||
ss.reifyMx.Unlock()
|
||||
|
||||
if ready {
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// first access using the standard view
|
||||
err = f(context.Background(), ss, block4.Cid())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// nothing should be reified
|
||||
waitForReification()
|
||||
for _, blk := range allBlocks {
|
||||
has, err := hot.Has(context.Background(), blk.Cid())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if has {
|
||||
t.Fatal("block unexpectedly reified")
|
||||
}
|
||||
}
|
||||
|
||||
// now make the hot/reifying view and ensure access reifies
|
||||
err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// everything should be reified
|
||||
waitForReification()
|
||||
for i, blk := range allBlocks {
|
||||
has, err := hot.Has(context.Background(), blk.Cid())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !has {
|
||||
t.Fatalf("block%d was not reified", i+1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitStoreReification(t *testing.T) {
|
||||
EnableReification = true
|
||||
t.Log("test reification with Has")
|
||||
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
|
||||
_, err := s.Has(ctx, c)
|
||||
return err
|
||||
})
|
||||
t.Log("test reification with Get")
|
||||
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
|
||||
_, err := s.Get(ctx, c)
|
||||
return err
|
||||
})
|
||||
t.Log("test reification with GetSize")
|
||||
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
|
||||
_, err := s.GetSize(ctx, c)
|
||||
return err
|
||||
})
|
||||
t.Log("test reification with View")
|
||||
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
|
||||
return s.View(ctx, c, func(_ []byte) error { return nil })
|
||||
})
|
||||
}
|
||||
|
||||
type mockChain struct {
|
||||
t testing.TB
|
||||
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
|
||||
/* inline-gen end */
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
||||
@ -93,6 +94,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, sm *stmgr.StateManager
|
||||
partDone()
|
||||
}()
|
||||
|
||||
ctx = blockstore.WithHotView(ctx)
|
||||
makeVmWithBaseStateAndEpoch := func(base cid.Cid, e abi.ChainEpoch) (vm.VMI, error) {
|
||||
filVested, err := sm.GetFilVested(ctx, e)
|
||||
if err != nil {
|
||||
|
@ -165,13 +165,8 @@ func DefaultUpgradeSchedule() stmgr.UpgradeSchedule {
|
||||
Migration: UpgradeActorsV7,
|
||||
PreMigrations: []stmgr.PreMigration{{
|
||||
PreMigration: PreUpgradeActorsV7,
|
||||
StartWithin: 120,
|
||||
StartWithin: 180,
|
||||
DontStartWithin: 60,
|
||||
StopWithin: 35,
|
||||
}, {
|
||||
PreMigration: PreUpgradeActorsV7,
|
||||
StartWithin: 30,
|
||||
DontStartWithin: 15,
|
||||
StopWithin: 5,
|
||||
}},
|
||||
Expensive: true,
|
||||
@ -1264,7 +1259,7 @@ func upgradeActorsV7Common(
|
||||
root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet,
|
||||
config nv15.Config,
|
||||
) (cid.Cid, error) {
|
||||
writeStore := blockstore.NewAutobatch(ctx, sm.ChainStore().StateBlockstore(), units.GiB)
|
||||
writeStore := blockstore.NewAutobatch(ctx, sm.ChainStore().StateBlockstore(), units.GiB/4)
|
||||
// TODO: pretty sure we'd achieve nothing by doing this, confirm in review
|
||||
//buf := blockstore.NewTieredBstore(sm.ChainStore().StateBlockstore(), writeStore)
|
||||
store := store.ActorStore(ctx, writeStore)
|
||||
|
224
chain/messagepool/check_test.go
Normal file
224
chain/messagepool/check_test.go
Normal file
@ -0,0 +1,224 @@
|
||||
//stm: #unit
|
||||
package messagepool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/consensus/filcns"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/types/mock"
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
|
||||
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
|
||||
)
|
||||
|
||||
func init() {
|
||||
_ = logging.SetLogLevel("*", "INFO")
|
||||
}
|
||||
|
||||
func getCheckMessageStatus(statusCode api.CheckStatusCode, msgStatuses []api.MessageCheckStatus) (*api.MessageCheckStatus, error) {
|
||||
for i := 0; i < len(msgStatuses); i++ {
|
||||
iMsgStatuses := msgStatuses[i]
|
||||
if iMsgStatuses.CheckStatus.Code == statusCode {
|
||||
return &iMsgStatuses, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("Could not find CheckStatusCode %s", statusCode)
|
||||
}
|
||||
|
||||
func TestCheckMessages(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_CHECK_MESSAGES_001
|
||||
tma := newTestMpoolAPI()
|
||||
|
||||
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ds := datastore.NewMapDatastore()
|
||||
|
||||
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
sender, err := w.WalletNew(context.Background(), types.KTSecp256k1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
tma.setBalance(sender, 1000e15)
|
||||
target := mock.Address(1001)
|
||||
|
||||
var protos []*api.MessagePrototype
|
||||
for i := 0; i < 5; i++ {
|
||||
msg := &types.Message{
|
||||
To: target,
|
||||
From: sender,
|
||||
Value: types.NewInt(1),
|
||||
Nonce: uint64(i),
|
||||
GasLimit: 50000000,
|
||||
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
|
||||
GasPremium: types.NewInt(1),
|
||||
Params: make([]byte, 2<<10),
|
||||
}
|
||||
proto := &api.MessagePrototype{
|
||||
Message: *msg,
|
||||
ValidNonce: true,
|
||||
}
|
||||
protos = append(protos, proto)
|
||||
}
|
||||
|
||||
messageStatuses, err := mp.CheckMessages(context.TODO(), protos)
|
||||
assert.NoError(t, err)
|
||||
for i := 0; i < len(messageStatuses); i++ {
|
||||
iMsgStatuses := messageStatuses[i]
|
||||
for j := 0; j < len(iMsgStatuses); j++ {
|
||||
jStatus := iMsgStatuses[i]
|
||||
assert.True(t, jStatus.OK)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckPendingMessages(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001
|
||||
tma := newTestMpoolAPI()
|
||||
|
||||
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ds := datastore.NewMapDatastore()
|
||||
|
||||
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
sender, err := w.WalletNew(context.Background(), types.KTSecp256k1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
tma.setBalance(sender, 1000e15)
|
||||
target := mock.Address(1001)
|
||||
|
||||
// add a valid message to the pool
|
||||
msg := &types.Message{
|
||||
To: target,
|
||||
From: sender,
|
||||
Value: types.NewInt(1),
|
||||
Nonce: 0,
|
||||
GasLimit: 50000000,
|
||||
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
|
||||
GasPremium: types.NewInt(1),
|
||||
Params: make([]byte, 2<<10),
|
||||
}
|
||||
|
||||
sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sm := &types.SignedMessage{
|
||||
Message: *msg,
|
||||
Signature: *sig,
|
||||
}
|
||||
mustAdd(t, mp, sm)
|
||||
|
||||
messageStatuses, err := mp.CheckPendingMessages(context.TODO(), sender)
|
||||
assert.NoError(t, err)
|
||||
for i := 0; i < len(messageStatuses); i++ {
|
||||
iMsgStatuses := messageStatuses[i]
|
||||
for j := 0; j < len(iMsgStatuses); j++ {
|
||||
jStatus := iMsgStatuses[i]
|
||||
assert.True(t, jStatus.OK)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckReplaceMessages(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_CHECK_REPLACE_MESSAGES_001
|
||||
tma := newTestMpoolAPI()
|
||||
|
||||
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ds := datastore.NewMapDatastore()
|
||||
|
||||
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
sender, err := w.WalletNew(context.Background(), types.KTSecp256k1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
tma.setBalance(sender, 1000e15)
|
||||
target := mock.Address(1001)
|
||||
|
||||
// add a valid message to the pool
|
||||
msg := &types.Message{
|
||||
To: target,
|
||||
From: sender,
|
||||
Value: types.NewInt(1),
|
||||
Nonce: 0,
|
||||
GasLimit: 50000000,
|
||||
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
|
||||
GasPremium: types.NewInt(1),
|
||||
Params: make([]byte, 2<<10),
|
||||
}
|
||||
|
||||
sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sm := &types.SignedMessage{
|
||||
Message: *msg,
|
||||
Signature: *sig,
|
||||
}
|
||||
mustAdd(t, mp, sm)
|
||||
|
||||
// create a new message with the same data, except that it is too big
|
||||
var msgs []*types.Message
|
||||
invalidmsg := &types.Message{
|
||||
To: target,
|
||||
From: sender,
|
||||
Value: types.NewInt(1),
|
||||
Nonce: 0,
|
||||
GasLimit: 50000000,
|
||||
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
|
||||
GasPremium: types.NewInt(1),
|
||||
Params: make([]byte, 128<<10),
|
||||
}
|
||||
msgs = append(msgs, invalidmsg)
|
||||
|
||||
{
|
||||
messageStatuses, err := mp.CheckReplaceMessages(context.TODO(), msgs)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i := 0; i < len(messageStatuses); i++ {
|
||||
iMsgStatuses := messageStatuses[i]
|
||||
|
||||
status, err := getCheckMessageStatus(api.CheckStatusMessageSize, iMsgStatuses)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// the replacement message should cause a status error
|
||||
assert.False(t, status.OK)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
@ -226,6 +227,8 @@ func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) {
|
||||
}
|
||||
|
||||
func TestMessagePool(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_GET_NONCE_001
|
||||
|
||||
tma := newTestMpoolAPI()
|
||||
|
||||
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
@ -327,6 +330,7 @@ func TestCheckMessageBig(t *testing.T) {
|
||||
Message: *msg,
|
||||
Signature: *sig,
|
||||
}
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||
err = mp.Add(context.TODO(), sm)
|
||||
assert.ErrorIs(t, err, ErrMessageTooBig)
|
||||
}
|
||||
@ -760,3 +764,302 @@ func TestUpdates(t *testing.T) {
|
||||
t.Fatal("expected closed channel, but got an update instead")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMessageBelowMinGasFee(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||
tma := newTestMpoolAPI()
|
||||
|
||||
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
assert.NoError(t, err)
|
||||
|
||||
from, err := w.WalletNew(context.Background(), types.KTBLS)
|
||||
assert.NoError(t, err)
|
||||
|
||||
tma.setBalance(from, 1000e9)
|
||||
|
||||
ds := datastore.NewMapDatastore()
|
||||
|
||||
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
to := mock.Address(1001)
|
||||
|
||||
// fee is just below minimum gas fee
|
||||
fee := minimumBaseFee.Uint64() - 1
|
||||
{
|
||||
msg := &types.Message{
|
||||
To: to,
|
||||
From: from,
|
||||
Value: types.NewInt(1),
|
||||
Nonce: 0,
|
||||
GasLimit: 50000000,
|
||||
GasFeeCap: types.NewInt(fee),
|
||||
GasPremium: types.NewInt(1),
|
||||
Params: make([]byte, 32<<10),
|
||||
}
|
||||
|
||||
sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sm := &types.SignedMessage{
|
||||
Message: *msg,
|
||||
Signature: *sig,
|
||||
}
|
||||
err = mp.Add(context.TODO(), sm)
|
||||
assert.ErrorIs(t, err, ErrGasFeeCapTooLow)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMessageValueTooHigh(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||
tma := newTestMpoolAPI()
|
||||
|
||||
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
assert.NoError(t, err)
|
||||
|
||||
from, err := w.WalletNew(context.Background(), types.KTBLS)
|
||||
assert.NoError(t, err)
|
||||
|
||||
tma.setBalance(from, 1000e9)
|
||||
|
||||
ds := datastore.NewMapDatastore()
|
||||
|
||||
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
to := mock.Address(1001)
|
||||
|
||||
totalFil := types.TotalFilecoinInt
|
||||
extra := types.NewInt(1)
|
||||
|
||||
value := types.BigAdd(totalFil, extra)
|
||||
{
|
||||
msg := &types.Message{
|
||||
To: to,
|
||||
From: from,
|
||||
Value: value,
|
||||
Nonce: 0,
|
||||
GasLimit: 50000000,
|
||||
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
|
||||
GasPremium: types.NewInt(1),
|
||||
Params: make([]byte, 32<<10),
|
||||
}
|
||||
|
||||
sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sm := &types.SignedMessage{
|
||||
Message: *msg,
|
||||
Signature: *sig,
|
||||
}
|
||||
|
||||
err = mp.Add(context.TODO(), sm)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMessageSignatureInvalid(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||
tma := newTestMpoolAPI()
|
||||
|
||||
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
assert.NoError(t, err)
|
||||
|
||||
from, err := w.WalletNew(context.Background(), types.KTBLS)
|
||||
assert.NoError(t, err)
|
||||
|
||||
tma.setBalance(from, 1000e9)
|
||||
|
||||
ds := datastore.NewMapDatastore()
|
||||
|
||||
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
to := mock.Address(1001)
|
||||
|
||||
{
|
||||
msg := &types.Message{
|
||||
To: to,
|
||||
From: from,
|
||||
Value: types.NewInt(1),
|
||||
Nonce: 0,
|
||||
GasLimit: 50000000,
|
||||
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
|
||||
GasPremium: types.NewInt(1),
|
||||
Params: make([]byte, 32<<10),
|
||||
}
|
||||
|
||||
badSig := &crypto.Signature{
|
||||
Type: crypto.SigTypeSecp256k1,
|
||||
Data: make([]byte, 0),
|
||||
}
|
||||
sm := &types.SignedMessage{
|
||||
Message: *msg,
|
||||
Signature: *badSig,
|
||||
}
|
||||
err = mp.Add(context.TODO(), sm)
|
||||
assert.Error(t, err)
|
||||
// assert.Contains(t, err.Error(), "invalid signature length")
|
||||
assert.Error(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddMessageTwice(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||
tma := newTestMpoolAPI()
|
||||
|
||||
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
assert.NoError(t, err)
|
||||
|
||||
from, err := w.WalletNew(context.Background(), types.KTBLS)
|
||||
assert.NoError(t, err)
|
||||
|
||||
tma.setBalance(from, 1000e9)
|
||||
|
||||
ds := datastore.NewMapDatastore()
|
||||
|
||||
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
to := mock.Address(1001)
|
||||
|
||||
{
|
||||
// create a valid messages
|
||||
sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64())
|
||||
mustAdd(t, mp, sm)
|
||||
|
||||
// try to add it twice
|
||||
err = mp.Add(context.TODO(), sm)
|
||||
// assert.Contains(t, err.Error(), "with nonce 0 already in mpool")
|
||||
assert.Error(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddMessageTwiceNonceGap(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||
tma := newTestMpoolAPI()
|
||||
|
||||
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
assert.NoError(t, err)
|
||||
|
||||
from, err := w.WalletNew(context.Background(), types.KTBLS)
|
||||
assert.NoError(t, err)
|
||||
|
||||
tma.setBalance(from, 1000e9)
|
||||
|
||||
ds := datastore.NewMapDatastore()
|
||||
|
||||
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
to := mock.Address(1001)
|
||||
|
||||
{
|
||||
// create message with invalid nonce (1)
|
||||
sm := makeTestMessage(w, from, to, 1, 50_000_000, minimumBaseFee.Uint64())
|
||||
mustAdd(t, mp, sm)
|
||||
|
||||
// then try to add message again
|
||||
err = mp.Add(context.TODO(), sm)
|
||||
// assert.Contains(t, err.Error(), "unfulfilled nonce gap")
|
||||
assert.Error(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddMessageTwiceCidDiff(t *testing.T) {
|
||||
tma := newTestMpoolAPI()
|
||||
|
||||
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
assert.NoError(t, err)
|
||||
|
||||
from, err := w.WalletNew(context.Background(), types.KTBLS)
|
||||
assert.NoError(t, err)
|
||||
|
||||
tma.setBalance(from, 1000e9)
|
||||
|
||||
ds := datastore.NewMapDatastore()
|
||||
|
||||
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
to := mock.Address(1001)
|
||||
|
||||
{
|
||||
sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64())
|
||||
mustAdd(t, mp, sm)
|
||||
|
||||
// Create message with different data, so CID is different
|
||||
sm2 := makeTestMessage(w, from, to, 0, 50_000_001, minimumBaseFee.Uint64())
|
||||
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||
// then try to add message again
|
||||
err = mp.Add(context.TODO(), sm2)
|
||||
// assert.Contains(t, err.Error(), "replace by fee has too low GasPremium")
|
||||
assert.Error(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddMessageTwiceCidDiffReplaced(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||
tma := newTestMpoolAPI()
|
||||
|
||||
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
assert.NoError(t, err)
|
||||
|
||||
from, err := w.WalletNew(context.Background(), types.KTBLS)
|
||||
assert.NoError(t, err)
|
||||
|
||||
tma.setBalance(from, 1000e9)
|
||||
|
||||
ds := datastore.NewMapDatastore()
|
||||
|
||||
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
to := mock.Address(1001)
|
||||
|
||||
{
|
||||
sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64())
|
||||
mustAdd(t, mp, sm)
|
||||
|
||||
// Create message with different data, so CID is different
|
||||
sm2 := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()*2)
|
||||
mustAdd(t, mp, sm2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveMessage(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||
tma := newTestMpoolAPI()
|
||||
|
||||
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
|
||||
assert.NoError(t, err)
|
||||
|
||||
from, err := w.WalletNew(context.Background(), types.KTBLS)
|
||||
assert.NoError(t, err)
|
||||
|
||||
tma.setBalance(from, 1000e9)
|
||||
|
||||
ds := datastore.NewMapDatastore()
|
||||
|
||||
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
|
||||
assert.NoError(t, err)
|
||||
|
||||
to := mock.Address(1001)
|
||||
|
||||
{
|
||||
sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64())
|
||||
mustAdd(t, mp, sm)
|
||||
|
||||
//stm: @CHAIN_MEMPOOL_REMOVE_001
|
||||
// remove message for sender
|
||||
mp.Remove(context.TODO(), from, sm.Message.Nonce, true)
|
||||
|
||||
//stm: @CHAIN_MEMPOOL_PENDING_FOR_001
|
||||
// check messages in pool: should be none present
|
||||
msgs := mp.pendingFor(context.TODO(), from)
|
||||
assert.Len(t, msgs, 0)
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
//stm: #unit
|
||||
package messagepool
|
||||
|
||||
import (
|
||||
@ -16,6 +17,7 @@ import (
|
||||
)
|
||||
|
||||
func TestRepubMessages(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001
|
||||
oldRepublishBatchDelay := RepublishBatchDelay
|
||||
RepublishBatchDelay = time.Microsecond
|
||||
defer func() {
|
||||
@ -57,6 +59,7 @@ func TestRepubMessages(t *testing.T) {
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_001
|
||||
_, err := mp.Push(context.TODO(), m)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -1,3 +1,4 @@
|
||||
//stm: #unit
|
||||
package messagepool
|
||||
|
||||
import (
|
||||
@ -74,6 +75,8 @@ func makeTestMpool() (*MessagePool, *testMpoolAPI) {
|
||||
}
|
||||
|
||||
func TestMessageChains(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001
|
||||
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
// the actors
|
||||
@ -310,6 +313,8 @@ func TestMessageChains(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMessageChainSkipping(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001
|
||||
|
||||
// regression test for chain skip bug
|
||||
|
||||
mp, tma := makeTestMpool()
|
||||
@ -382,6 +387,7 @@ func TestMessageChainSkipping(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBasicMessageSelection(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
oldMaxNonceGap := MaxNonceGap
|
||||
MaxNonceGap = 1000
|
||||
defer func() {
|
||||
@ -532,6 +538,7 @@ func TestBasicMessageSelection(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMessageSelectionTrimmingGas(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
// the actors
|
||||
@ -595,6 +602,7 @@ func TestMessageSelectionTrimmingGas(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
// the actors
|
||||
@ -641,6 +649,7 @@ func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
// the actors
|
||||
@ -707,6 +716,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
// the actors
|
||||
@ -788,6 +798,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPriorityMessageSelection(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
// the actors
|
||||
@ -867,6 +878,7 @@ func TestPriorityMessageSelection(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPriorityMessageSelection2(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
// the actors
|
||||
@ -934,6 +946,7 @@ func TestPriorityMessageSelection2(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPriorityMessageSelection3(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
mp, tma := makeTestMpool()
|
||||
|
||||
// the actors
|
||||
@ -1028,6 +1041,8 @@ func TestPriorityMessageSelection3(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestOptimalMessageSelection1(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
|
||||
// this test uses just a single actor sending messages with a low tq
|
||||
// the chain depenent merging algorithm should pick messages from the actor
|
||||
// from the start
|
||||
@ -1094,6 +1109,8 @@ func TestOptimalMessageSelection1(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestOptimalMessageSelection2(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
|
||||
// this test uses two actors sending messages to each other, with the first
|
||||
// actor paying (much) higher gas premium than the second.
|
||||
// We select with a low ticket quality; the chain depenent merging algorithm should pick
|
||||
@ -1173,6 +1190,8 @@ func TestOptimalMessageSelection2(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestOptimalMessageSelection3(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
|
||||
// this test uses 10 actors sending a block of messages to each other, with the the first
|
||||
// actors paying higher gas premium than the subsequent actors.
|
||||
// We select with a low ticket quality; the chain dependent merging algorithm should pick
|
||||
@ -1416,6 +1435,8 @@ func makeZipfPremiumDistribution(rng *rand.Rand) func() uint64 {
|
||||
}
|
||||
|
||||
func TestCompetitiveMessageSelectionExp(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
|
||||
if testing.Short() {
|
||||
t.Skip("skipping in short mode")
|
||||
}
|
||||
@ -1439,6 +1460,8 @@ func TestCompetitiveMessageSelectionExp(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCompetitiveMessageSelectionZipf(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
|
||||
if testing.Short() {
|
||||
t.Skip("skipping in short mode")
|
||||
}
|
||||
@ -1462,6 +1485,7 @@ func TestCompetitiveMessageSelectionZipf(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGasReward(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_GET_GAS_REWARD_001
|
||||
tests := []struct {
|
||||
Premium uint64
|
||||
FeeCap uint64
|
||||
@ -1494,6 +1518,8 @@ func TestGasReward(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRealWorldSelection(t *testing.T) {
|
||||
//stm: @TOKEN_WALLET_NEW_001, @TOKEN_WALLET_SIGN_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
|
||||
// load test-messages.json.gz and rewrite the messages so that
|
||||
// 1) we map each real actor to a test actor so that we can sign the messages
|
||||
// 2) adjust the nonces so that they start from 0
|
||||
|
@ -126,7 +126,7 @@ func infoCmdAct(cctx *cli.Context) error {
|
||||
|
||||
alerts, err := minerApi.LogAlerts(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting alerts: %w", err)
|
||||
fmt.Printf("ERROR: getting alerts: %s\n", err)
|
||||
}
|
||||
|
||||
activeAlerts := make([]alerting.Alert, 0)
|
||||
|
@ -96,6 +96,11 @@ var infoAllCmd = &cli.Command{
|
||||
fmt.Println("ERROR: ", err)
|
||||
}
|
||||
|
||||
fmt.Println("\n#: Storage Locks")
|
||||
if err := storageLocks.Action(cctx); err != nil {
|
||||
fmt.Println("ERROR: ", err)
|
||||
}
|
||||
|
||||
fmt.Println("\n#: Sched Diag")
|
||||
if err := sealingSchedDiagCmd.Action(cctx); err != nil {
|
||||
fmt.Println("ERROR: ", err)
|
||||
@ -192,6 +197,11 @@ var infoAllCmd = &cli.Command{
|
||||
fmt.Println("ERROR: ", err)
|
||||
}
|
||||
|
||||
fmt.Println("\n#: Storage Sector List")
|
||||
if err := storageListSectorsCmd.Action(cctx); err != nil {
|
||||
fmt.Println("ERROR: ", err)
|
||||
}
|
||||
|
||||
fmt.Println("\n#: Expired Sectors")
|
||||
if err := sectorsExpiredCmd.Action(cctx); err != nil {
|
||||
fmt.Println("ERROR: ", err)
|
||||
|
@ -467,12 +467,15 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
|
||||
stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{})
|
||||
|
||||
smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{
|
||||
ParallelFetchLimit: 10,
|
||||
AllowAddPiece: true,
|
||||
AllowPreCommit1: true,
|
||||
AllowPreCommit2: true,
|
||||
AllowCommit: true,
|
||||
AllowUnseal: true,
|
||||
ParallelFetchLimit: 10,
|
||||
AllowAddPiece: true,
|
||||
AllowPreCommit1: true,
|
||||
AllowPreCommit2: true,
|
||||
AllowCommit: true,
|
||||
AllowUnseal: true,
|
||||
AllowReplicaUpdate: true,
|
||||
AllowProveReplicaUpdate2: true,
|
||||
AllowRegenSectorKey: true,
|
||||
}, wsts, smsts)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -161,7 +161,7 @@ var sectorsStatusCmd = &cli.Command{
|
||||
fmt.Printf("Expiration:\t\t%v\n", status.Expiration)
|
||||
fmt.Printf("DealWeight:\t\t%v\n", status.DealWeight)
|
||||
fmt.Printf("VerifiedDealWeight:\t\t%v\n", status.VerifiedDealWeight)
|
||||
fmt.Printf("InitialPledge:\t\t%v\n", status.InitialPledge)
|
||||
fmt.Printf("InitialPledge:\t\t%v\n", types.FIL(status.InitialPledge))
|
||||
fmt.Printf("\nExpiration Info\n")
|
||||
fmt.Printf("OnTime:\t\t%v\n", status.OnTime)
|
||||
fmt.Printf("Early:\t\t%v\n", status.Early)
|
||||
@ -294,8 +294,14 @@ var sectorsListCmd = &cli.Command{
|
||||
Aliases: []string{"e"},
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "seal-time",
|
||||
Usage: "display how long it took for the sector to be sealed",
|
||||
Name: "initial-pledge",
|
||||
Usage: "display initial pledge",
|
||||
Aliases: []string{"p"},
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "seal-time",
|
||||
Usage: "display how long it took for the sector to be sealed",
|
||||
Aliases: []string{"t"},
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "states",
|
||||
@ -405,6 +411,7 @@ var sectorsListCmd = &cli.Command{
|
||||
tablewriter.Col("Deals"),
|
||||
tablewriter.Col("DealWeight"),
|
||||
tablewriter.Col("VerifiedPower"),
|
||||
tablewriter.Col("Pledge"),
|
||||
tablewriter.NewLineCol("Error"),
|
||||
tablewriter.NewLineCol("RecoveryTimeout"))
|
||||
|
||||
@ -483,6 +490,9 @@ var sectorsListCmd = &cli.Command{
|
||||
m["RecoveryTimeout"] = color.YellowString(lcli.EpochTime(head.Height(), st.Early))
|
||||
}
|
||||
}
|
||||
if inSSet && cctx.Bool("initial-pledge") {
|
||||
m["Pledge"] = types.FIL(st.InitialPledge).Short()
|
||||
}
|
||||
}
|
||||
|
||||
if !fast && deals > 0 {
|
||||
|
@ -368,6 +368,7 @@ type storedSector struct {
|
||||
store stores.SectorStorageInfo
|
||||
|
||||
unsealed, sealed, cache bool
|
||||
update, updatecache bool
|
||||
}
|
||||
|
||||
var storageFindCmd = &cli.Command{
|
||||
@ -421,6 +422,16 @@ var storageFindCmd = &cli.Command{
|
||||
return xerrors.Errorf("finding cache: %w", err)
|
||||
}
|
||||
|
||||
us, err := nodeApi.StorageFindSector(ctx, sid, storiface.FTUpdate, 0, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding sealed: %w", err)
|
||||
}
|
||||
|
||||
uc, err := nodeApi.StorageFindSector(ctx, sid, storiface.FTUpdateCache, 0, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding cache: %w", err)
|
||||
}
|
||||
|
||||
byId := map[stores.ID]*storedSector{}
|
||||
for _, info := range u {
|
||||
sts, ok := byId[info.ID]
|
||||
@ -455,6 +466,28 @@ var storageFindCmd = &cli.Command{
|
||||
}
|
||||
sts.cache = true
|
||||
}
|
||||
for _, info := range us {
|
||||
sts, ok := byId[info.ID]
|
||||
if !ok {
|
||||
sts = &storedSector{
|
||||
id: info.ID,
|
||||
store: info,
|
||||
}
|
||||
byId[info.ID] = sts
|
||||
}
|
||||
sts.update = true
|
||||
}
|
||||
for _, info := range uc {
|
||||
sts, ok := byId[info.ID]
|
||||
if !ok {
|
||||
sts = &storedSector{
|
||||
id: info.ID,
|
||||
store: info,
|
||||
}
|
||||
byId[info.ID] = sts
|
||||
}
|
||||
sts.updatecache = true
|
||||
}
|
||||
|
||||
local, err := nodeApi.StorageLocal(ctx)
|
||||
if err != nil {
|
||||
@ -480,6 +513,12 @@ var storageFindCmd = &cli.Command{
|
||||
if info.cache {
|
||||
types += "Cache, "
|
||||
}
|
||||
if info.update {
|
||||
types += "Update, "
|
||||
}
|
||||
if info.updatecache {
|
||||
types += "UpdateCache, "
|
||||
}
|
||||
|
||||
fmt.Printf("In %s (%s)\n", info.id, types[:len(types)-2])
|
||||
fmt.Printf("\tSealing: %t; Storage: %t\n", info.store.CanSeal, info.store.CanStore)
|
||||
|
@ -173,6 +173,11 @@ var runCmd = &cli.Command{
|
||||
Usage: "enable prove replica update 2",
|
||||
Value: true,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "regen-sector-key",
|
||||
Usage: "enable regen sector key",
|
||||
Value: true,
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "parallel-fetch-limit",
|
||||
Usage: "maximum fetch operations to run in parallel",
|
||||
@ -278,12 +283,15 @@ var runCmd = &cli.Command{
|
||||
if cctx.Bool("commit") {
|
||||
taskTypes = append(taskTypes, sealtasks.TTCommit2)
|
||||
}
|
||||
if cctx.Bool("replicaupdate") {
|
||||
if cctx.Bool("replica-update") {
|
||||
taskTypes = append(taskTypes, sealtasks.TTReplicaUpdate)
|
||||
}
|
||||
if cctx.Bool("prove-replica-update2") {
|
||||
taskTypes = append(taskTypes, sealtasks.TTProveReplicaUpdate2)
|
||||
}
|
||||
if cctx.Bool("regen-sector-key") {
|
||||
taskTypes = append(taskTypes, sealtasks.TTRegenSectorKey)
|
||||
}
|
||||
|
||||
if len(taskTypes) == 0 {
|
||||
return xerrors.Errorf("no task types specified")
|
||||
|
@ -22,11 +22,14 @@ var tasksCmd = &cli.Command{
|
||||
}
|
||||
|
||||
var allowSetting = map[sealtasks.TaskType]struct{}{
|
||||
sealtasks.TTAddPiece: {},
|
||||
sealtasks.TTPreCommit1: {},
|
||||
sealtasks.TTPreCommit2: {},
|
||||
sealtasks.TTCommit2: {},
|
||||
sealtasks.TTUnseal: {},
|
||||
sealtasks.TTAddPiece: {},
|
||||
sealtasks.TTPreCommit1: {},
|
||||
sealtasks.TTPreCommit2: {},
|
||||
sealtasks.TTCommit2: {},
|
||||
sealtasks.TTUnseal: {},
|
||||
sealtasks.TTReplicaUpdate: {},
|
||||
sealtasks.TTProveReplicaUpdate2: {},
|
||||
sealtasks.TTRegenSectorKey: {},
|
||||
}
|
||||
|
||||
var settableStr = func() string {
|
||||
|
104
cmd/lotus-shed/diff.go
Normal file
104
cmd/lotus-shed/diff.go
Normal file
@ -0,0 +1,104 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
)
|
||||
|
||||
var diffCmd = &cli.Command{
|
||||
Name: "diff",
|
||||
Usage: "diff state objects",
|
||||
Subcommands: []*cli.Command{diffStateTrees},
|
||||
}
|
||||
|
||||
var diffStateTrees = &cli.Command{
|
||||
Name: "state-trees",
|
||||
Usage: "diff two state-trees",
|
||||
ArgsUsage: "<state-tree-a> <state-tree-b>",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
if cctx.NArg() != 2 {
|
||||
return xerrors.Errorf("expected two state-tree roots")
|
||||
}
|
||||
|
||||
argA := cctx.Args().Get(1)
|
||||
rootA, err := cid.Parse(argA)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("first state-tree root (%q) is not a CID: %w", argA, err)
|
||||
}
|
||||
argB := cctx.Args().Get(1)
|
||||
rootB, err := cid.Parse(argB)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("second state-tree root (%q) is not a CID: %w", argB, err)
|
||||
}
|
||||
|
||||
if rootA == rootB {
|
||||
fmt.Println("state trees do not differ")
|
||||
return nil
|
||||
}
|
||||
|
||||
changedB, err := api.StateChangedActors(ctx, rootA, rootB)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
changedA, err := api.StateChangedActors(ctx, rootB, rootA)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
diff := func(stateA, stateB types.Actor) {
|
||||
if stateB.Code != stateA.Code {
|
||||
fmt.Printf(" code: %s != %s\n", stateA.Code, stateB.Code)
|
||||
}
|
||||
if stateB.Head != stateA.Head {
|
||||
fmt.Printf(" state: %s != %s\n", stateA.Head, stateB.Head)
|
||||
}
|
||||
if stateB.Nonce != stateA.Nonce {
|
||||
fmt.Printf(" nonce: %d != %d\n", stateA.Nonce, stateB.Nonce)
|
||||
}
|
||||
if !stateB.Balance.Equals(stateA.Balance) {
|
||||
fmt.Printf(" balance: %s != %s\n", stateA.Balance, stateB.Balance)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("state differences between %s (first) and %s (second):\n\n", rootA, rootB)
|
||||
for addr, stateA := range changedA {
|
||||
fmt.Println(addr)
|
||||
stateB, ok := changedB[addr]
|
||||
if ok {
|
||||
diff(stateA, stateB)
|
||||
continue
|
||||
} else {
|
||||
fmt.Printf(" actor does not exist in second state-tree (%s)\n", rootB)
|
||||
}
|
||||
fmt.Println()
|
||||
delete(changedB, addr)
|
||||
}
|
||||
for addr, stateB := range changedB {
|
||||
fmt.Println(addr)
|
||||
stateA, ok := changedA[addr]
|
||||
if ok {
|
||||
diff(stateA, stateB)
|
||||
continue
|
||||
} else {
|
||||
fmt.Printf(" actor does not exist in first state-tree (%s)\n", rootA)
|
||||
}
|
||||
fmt.Println()
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
@ -68,6 +68,7 @@ func main() {
|
||||
sendCsvCmd,
|
||||
terminationsCmd,
|
||||
migrationsCmd,
|
||||
diffCmd,
|
||||
}
|
||||
|
||||
app := &cli.App{
|
||||
|
@ -1621,14 +1621,15 @@ USAGE:
|
||||
lotus-miner sectors list [command options] [arguments...]
|
||||
|
||||
OPTIONS:
|
||||
--show-removed, -r show removed sectors (default: false)
|
||||
--color, -c use color in display output (default: depends on output being a TTY)
|
||||
--fast, -f don't show on-chain info for better performance (default: false)
|
||||
--events, -e display number of events the sector has received (default: false)
|
||||
--seal-time display how long it took for the sector to be sealed (default: false)
|
||||
--states value filter sectors by a comma-separated list of states
|
||||
--unproven, -u only show sectors which aren't in the 'Proving' state (default: false)
|
||||
--help, -h show help (default: false)
|
||||
--show-removed, -r show removed sectors (default: false)
|
||||
--color, -c use color in display output (default: depends on output being a TTY)
|
||||
--fast, -f don't show on-chain info for better performance (default: false)
|
||||
--events, -e display number of events the sector has received (default: false)
|
||||
--initial-pledge, -p display initial pledge (default: false)
|
||||
--seal-time, -t display how long it took for the sector to be sealed (default: false)
|
||||
--states value filter sectors by a comma-separated list of states
|
||||
--unproven, -u only show sectors which aren't in the 'Proving' state (default: false)
|
||||
--help, -h show help (default: false)
|
||||
|
||||
```
|
||||
|
||||
|
@ -46,6 +46,7 @@ OPTIONS:
|
||||
--commit enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap) (default: true)
|
||||
--replica-update enable replica update (default: true)
|
||||
--prove-replica-update2 enable prove replica update 2 (default: true)
|
||||
--regen-sector-key enable regen sector key (default: true)
|
||||
--parallel-fetch-limit value maximum fetch operations to run in parallel (default: 5)
|
||||
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m")
|
||||
--help, -h show help (default: false)
|
||||
@ -170,7 +171,7 @@ NAME:
|
||||
lotus-worker tasks enable - Enable a task type
|
||||
|
||||
USAGE:
|
||||
lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|AP]
|
||||
lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK]
|
||||
|
||||
OPTIONS:
|
||||
--help, -h show help (default: false)
|
||||
@ -183,7 +184,7 @@ NAME:
|
||||
lotus-worker tasks disable - Disable a task type
|
||||
|
||||
USAGE:
|
||||
lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|AP]
|
||||
lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK]
|
||||
|
||||
OPTIONS:
|
||||
--help, -h show help (default: false)
|
||||
|
@ -438,6 +438,9 @@
|
||||
# env var: LOTUS_STORAGE_ALLOWPROVEREPLICAUPDATE2
|
||||
#AllowProveReplicaUpdate2 = true
|
||||
|
||||
# env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY
|
||||
#AllowRegenSectorKey = true
|
||||
|
||||
# env var: LOTUS_STORAGE_RESOURCEFILTERING
|
||||
#ResourceFiltering = "hardware"
|
||||
|
||||
|
4
extern/sector-storage/manager.go
vendored
4
extern/sector-storage/manager.go
vendored
@ -105,6 +105,7 @@ type SealerConfig struct {
|
||||
AllowUnseal bool
|
||||
AllowReplicaUpdate bool
|
||||
AllowProveReplicaUpdate2 bool
|
||||
AllowRegenSectorKey bool
|
||||
|
||||
// ResourceFiltering instructs the system which resource filtering strategy
|
||||
// to use when evaluating tasks against this worker. An empty value defaults
|
||||
@ -169,6 +170,9 @@ func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls store
|
||||
if sc.AllowProveReplicaUpdate2 {
|
||||
localTasks = append(localTasks, sealtasks.TTProveReplicaUpdate2)
|
||||
}
|
||||
if sc.AllowRegenSectorKey {
|
||||
localTasks = append(localTasks, sealtasks.TTRegenSectorKey)
|
||||
}
|
||||
|
||||
wcfg := WorkerConfig{
|
||||
IgnoreResourceFiltering: sc.ResourceFiltering == ResourceFilteringDisabled,
|
||||
|
9
extern/storage-sealing/checks.go
vendored
9
extern/storage-sealing/checks.go
vendored
@ -214,8 +214,13 @@ func checkReplicaUpdate(ctx context.Context, maddr address.Address, si SectorInf
|
||||
if err != nil {
|
||||
return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)}
|
||||
}
|
||||
if si.UpdateUnsealed == nil || !commD.Equals(*si.UpdateUnsealed) {
|
||||
return &ErrBadRU{xerrors.Errorf("on chain CommD differs from sector: %s != %s", commD, si.CommD)}
|
||||
|
||||
if si.UpdateUnsealed == nil {
|
||||
return &ErrBadRU{xerrors.New("nil UpdateUnsealed cid after replica update")}
|
||||
}
|
||||
|
||||
if !commD.Equals(*si.UpdateUnsealed) {
|
||||
return &ErrBadRU{xerrors.Errorf("calculated CommD differs from updated replica: %s != %s", commD, *si.UpdateUnsealed)}
|
||||
}
|
||||
|
||||
if si.UpdateSealed == nil {
|
||||
|
2
go.mod
2
go.mod
@ -118,7 +118,7 @@ require (
|
||||
github.com/libp2p/go-libp2p-pubsub v0.6.1
|
||||
github.com/libp2p/go-libp2p-quic-transport v0.16.1
|
||||
github.com/libp2p/go-libp2p-record v0.1.3
|
||||
github.com/libp2p/go-libp2p-resource-manager v0.1.3
|
||||
github.com/libp2p/go-libp2p-resource-manager v0.1.4
|
||||
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
|
||||
github.com/libp2p/go-libp2p-swarm v0.10.1
|
||||
github.com/libp2p/go-libp2p-tls v0.3.1
|
||||
|
3
go.sum
3
go.sum
@ -1156,8 +1156,9 @@ github.com/libp2p/go-libp2p-record v0.1.2/go.mod h1:pal0eNcT5nqZaTV7UGhqeGqxFgGd
|
||||
github.com/libp2p/go-libp2p-record v0.1.3 h1:R27hoScIhQf/A8XJZ8lYpnqh9LatJ5YbHs28kCIfql0=
|
||||
github.com/libp2p/go-libp2p-record v0.1.3/go.mod h1:yNUff/adKIfPnYQXgp6FQmNu3gLJ6EMg7+/vv2+9pY4=
|
||||
github.com/libp2p/go-libp2p-resource-manager v0.1.0/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
|
||||
github.com/libp2p/go-libp2p-resource-manager v0.1.3 h1:Umf0tW6WNXSb6Uoma0YT56azB5iikL/aeGAP7s7+f5o=
|
||||
github.com/libp2p/go-libp2p-resource-manager v0.1.3/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
|
||||
github.com/libp2p/go-libp2p-resource-manager v0.1.4 h1:RcxMD0pytOUimx3BqTVs6IqItb3H5Qg44SD7XyT68lw=
|
||||
github.com/libp2p/go-libp2p-resource-manager v0.1.4/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
|
||||
github.com/libp2p/go-libp2p-routing v0.0.1/go.mod h1:N51q3yTr4Zdr7V8Jt2JIktVU+3xBBylx1MZeVA6t1Ys=
|
||||
github.com/libp2p/go-libp2p-routing v0.1.0/go.mod h1:zfLhI1RI8RLEzmEaaPwzonRvXeeSHddONWkcTcB54nE=
|
||||
github.com/libp2p/go-libp2p-routing-helpers v0.2.3 h1:xY61alxJ6PurSi+MXbywZpelvuU4U4p/gPTxjqCqTzY=
|
||||
|
@ -3,6 +3,7 @@ package kit
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
@ -10,6 +11,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
aminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
@ -63,11 +65,10 @@ func (p *partitionTracker) done(t *testing.T) bool {
|
||||
return uint64(len(p.partitions)) == p.count(t)
|
||||
}
|
||||
|
||||
func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *types.SignedMessage) (ret bool) {
|
||||
func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, msg *types.Message) (ret bool) {
|
||||
defer func() {
|
||||
ret = p.done(t)
|
||||
}()
|
||||
msg := smsg.Message
|
||||
if !(msg.To == bm.miner.ActorAddr) {
|
||||
return
|
||||
}
|
||||
@ -82,11 +83,69 @@ func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *type
|
||||
return
|
||||
}
|
||||
|
||||
func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *dline.Info) {
|
||||
|
||||
tracker := newPartitionTracker(ctx, dlinfo.Index, bm)
|
||||
if !tracker.done(bm.t) { // need to wait for post
|
||||
bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t))
|
||||
poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) //subscribe before checking pending so we don't miss any events
|
||||
require.NoError(bm.t, err)
|
||||
|
||||
// First check pending messages we'll mine this epoch
|
||||
msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK)
|
||||
require.NoError(bm.t, err)
|
||||
for _, msg := range msgs {
|
||||
if tracker.recordIfPost(bm.t, bm, &msg.Message) {
|
||||
fmt.Printf("found post in mempool pending\n")
|
||||
}
|
||||
}
|
||||
|
||||
// Account for included but not yet executed messages
|
||||
for _, bc := range ts.Cids() {
|
||||
msgs, err := bm.miner.FullNode.ChainGetBlockMessages(ctx, bc)
|
||||
require.NoError(bm.t, err)
|
||||
for _, msg := range msgs.BlsMessages {
|
||||
if tracker.recordIfPost(bm.t, bm, msg) {
|
||||
fmt.Printf("found post in message of prev tipset\n")
|
||||
}
|
||||
|
||||
}
|
||||
for _, msg := range msgs.SecpkMessages {
|
||||
if tracker.recordIfPost(bm.t, bm, &msg.Message) {
|
||||
fmt.Printf("found post in message of prev tipset\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// post not yet in mpool, wait for it
|
||||
if !tracker.done(bm.t) {
|
||||
bm.t.Logf("post missing from mpool, block mining suspended until it arrives")
|
||||
POOL:
|
||||
for {
|
||||
bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key())
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case evt := <-poolEvts:
|
||||
bm.t.Logf("pool event: %d", evt.Type)
|
||||
if evt.Type == api.MpoolAdd {
|
||||
bm.t.Logf("incoming message %v", evt.Message)
|
||||
if tracker.recordIfPost(bm.t, bm, &evt.Message.Message) {
|
||||
fmt.Printf("found post in mempool evt\n")
|
||||
break POOL
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
bm.t.Logf("done waiting on mpool")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Like MineBlocks but refuses to mine until the window post scheduler has wdpost messages in the mempool
|
||||
// and everything shuts down if a post fails. It also enforces that every block mined succeeds
|
||||
func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Duration) {
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// wrap context in a cancellable context.
|
||||
ctx, bm.cancel = context.WithCancel(ctx)
|
||||
@ -94,8 +153,6 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
|
||||
go func() {
|
||||
defer bm.wg.Done()
|
||||
|
||||
activeDeadlines := make(map[int]struct{})
|
||||
_ = activeDeadlines
|
||||
ts, err := bm.miner.FullNode.ChainHead(ctx)
|
||||
require.NoError(bm.t, err)
|
||||
wait := make(chan bool)
|
||||
@ -103,7 +160,7 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
|
||||
require.NoError(bm.t, err)
|
||||
// read current out
|
||||
curr := <-chg
|
||||
require.Equal(bm.t, ts.Height(), curr[0].Val.Height())
|
||||
require.Equal(bm.t, ts.Height(), curr[0].Val.Height(), "failed sanity check: are multiple miners mining with must post?")
|
||||
for {
|
||||
select {
|
||||
case <-time.After(blocktime):
|
||||
@ -111,52 +168,15 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
|
||||
return
|
||||
}
|
||||
nulls := atomic.SwapInt64(&bm.nextNulls, 0)
|
||||
require.Equal(bm.t, int64(0), nulls, "Injecting > 0 null blocks while `MustPost` mining is currently unsupported")
|
||||
|
||||
// Wake up and figure out if we are at the end of an active deadline
|
||||
ts, err := bm.miner.FullNode.ChainHead(ctx)
|
||||
require.NoError(bm.t, err)
|
||||
tsk := ts.Key()
|
||||
|
||||
dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, tsk)
|
||||
dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, ts.Key())
|
||||
require.NoError(bm.t, err)
|
||||
if ts.Height()+1 == dlinfo.Last() { // Last epoch in dline, we need to check that miner has posted
|
||||
|
||||
tracker := newPartitionTracker(ctx, dlinfo.Index, bm)
|
||||
if !tracker.done(bm.t) { // need to wait for post
|
||||
bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t))
|
||||
poolEvts, err := bm.miner.FullNode.MpoolSub(ctx)
|
||||
require.NoError(bm.t, err)
|
||||
|
||||
// First check pending messages we'll mine this epoch
|
||||
msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK)
|
||||
require.NoError(bm.t, err)
|
||||
for _, msg := range msgs {
|
||||
tracker.recordIfPost(bm.t, bm, msg)
|
||||
}
|
||||
|
||||
// post not yet in mpool, wait for it
|
||||
if !tracker.done(bm.t) {
|
||||
bm.t.Logf("post missing from mpool, block mining suspended until it arrives")
|
||||
POOL:
|
||||
for {
|
||||
bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key())
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case evt := <-poolEvts:
|
||||
bm.t.Logf("pool event: %d", evt.Type)
|
||||
if evt.Type == api.MpoolAdd {
|
||||
bm.t.Logf("incoming message %v", evt.Message)
|
||||
if tracker.recordIfPost(bm.t, bm, evt.Message) {
|
||||
break POOL
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
bm.t.Logf("done waiting on mpool")
|
||||
}
|
||||
}
|
||||
if ts.Height()+1+abi.ChainEpoch(nulls) >= dlinfo.Last() { // Next block brings us past the last epoch in dline, we need to wait for miner to post
|
||||
bm.forcePoSt(ctx, ts, dlinfo)
|
||||
}
|
||||
|
||||
var target abi.ChainEpoch
|
||||
@ -173,6 +193,12 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
|
||||
Done: reportSuccessFn,
|
||||
})
|
||||
success = <-wait
|
||||
if !success {
|
||||
// if we are mining a new null block and it brings us past deadline boundary we need to wait for miner to post
|
||||
if ts.Height()+1+abi.ChainEpoch(nulls+i) >= dlinfo.Last() {
|
||||
bm.forcePoSt(ctx, ts, dlinfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until it shows up on the given full nodes ChainHead
|
||||
|
34
itests/kit/circuit.go
Normal file
34
itests/kit/circuit.go
Normal file
@ -0,0 +1,34 @@
|
||||
package kit
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
/*
|
||||
CircuitBreaker implements a simple time-based circuit breaker used for waiting for async operations to finish.
|
||||
|
||||
This is how it works:
|
||||
- It runs the `cb` function until it returns true,
|
||||
- waiting for `throttle` duration between each iteration,
|
||||
- or at most `timeout` duration until it breaks test execution.
|
||||
|
||||
You can use it if t.Deadline() is not "granular" enough, and you want to know which specific piece of code timed out,
|
||||
or you need to set different deadlines in the same test.
|
||||
*/
|
||||
func CircuitBreaker(t *testing.T, label string, throttle, timeout time.Duration, cb func() bool) {
|
||||
tmo := time.After(timeout)
|
||||
for {
|
||||
if cb() {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-tmo:
|
||||
t.Fatal("timeout: ", label)
|
||||
default:
|
||||
fmt.Printf("waiting: %s\n", label)
|
||||
time.Sleep(throttle)
|
||||
}
|
||||
}
|
||||
}
|
521
itests/mempool_test.go
Normal file
521
itests/mempool_test.go
Normal file
@ -0,0 +1,521 @@
|
||||
//stm: #integration
|
||||
package itests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const mPoolThrottle = time.Millisecond * 100
|
||||
const mPoolTimeout = time.Second * 10
|
||||
|
||||
func TestMemPoolPushSingleNode(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_002
|
||||
ctx := context.Background()
|
||||
const blockTime = 100 * time.Millisecond
|
||||
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
|
||||
ens.InterconnectAll()
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
sender := firstNode.DefaultKey.Address
|
||||
|
||||
addr, err := firstNode.WalletNew(ctx, types.KTBLS)
|
||||
require.NoError(t, err)
|
||||
|
||||
const totalMessages = 10
|
||||
|
||||
bal, err := firstNode.WalletBalance(ctx, sender)
|
||||
require.NoError(t, err)
|
||||
toSend := big.Div(bal, big.NewInt(10))
|
||||
each := big.Div(toSend, big.NewInt(totalMessages))
|
||||
|
||||
// add messages to be mined/published
|
||||
var sms []*types.SignedMessage
|
||||
for i := 0; i < totalMessages; i++ {
|
||||
msg := &types.Message{
|
||||
From: sender,
|
||||
To: addr,
|
||||
Value: each,
|
||||
}
|
||||
|
||||
sm, err := firstNode.MpoolPushMessage(ctx, msg, nil)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, i, sm.Message.Nonce)
|
||||
|
||||
sms = append(sms, sm)
|
||||
}
|
||||
|
||||
// check pending messages for address
|
||||
kit.CircuitBreaker(t, "push messages", mPoolThrottle, mPoolTimeout, func() bool {
|
||||
msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender)
|
||||
if len(msgStatuses) == totalMessages {
|
||||
for _, msgStatusList := range msgStatuses {
|
||||
for _, status := range msgStatusList {
|
||||
require.True(t, status.OK)
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
// verify messages should be the ones included in the next block
|
||||
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
|
||||
for _, msg := range sms {
|
||||
found := false
|
||||
for _, selectedMsg := range selected {
|
||||
if selectedMsg.Cid() == msg.Cid() {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, found)
|
||||
}
|
||||
|
||||
ens.BeginMining(blockTime)
|
||||
|
||||
kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool {
|
||||
// pool pending list should be empty
|
||||
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
if len(pending) == 0 {
|
||||
// all messages should be added to the chain
|
||||
for _, lookMsg := range sms {
|
||||
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, msgLookup)
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func TestMemPoolPushTwoNodes(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_002
|
||||
ctx := context.Background()
|
||||
const blockTime = 100 * time.Millisecond
|
||||
firstNode, secondNode, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
|
||||
ens.InterconnectAll()
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
sender := firstNode.DefaultKey.Address
|
||||
sender2 := secondNode.DefaultKey.Address
|
||||
|
||||
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
|
||||
addr2, _ := secondNode.WalletNew(ctx, types.KTBLS)
|
||||
|
||||
bal, err := firstNode.WalletBalance(ctx, sender)
|
||||
require.NoError(t, err)
|
||||
|
||||
const totalMessages = 10
|
||||
|
||||
toSend := big.Div(bal, big.NewInt(10))
|
||||
each := big.Div(toSend, big.NewInt(totalMessages))
|
||||
|
||||
var sms []*types.SignedMessage
|
||||
// push messages to message pools of both nodes
|
||||
for i := 0; i < totalMessages; i++ {
|
||||
// first
|
||||
msg1 := &types.Message{
|
||||
From: sender,
|
||||
To: addr,
|
||||
Value: each,
|
||||
}
|
||||
|
||||
sm1, err := firstNode.MpoolPushMessage(ctx, msg1, nil)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, i, sm1.Message.Nonce)
|
||||
sms = append(sms, sm1)
|
||||
|
||||
// second
|
||||
msg2 := &types.Message{
|
||||
From: sender2,
|
||||
To: addr2,
|
||||
Value: each,
|
||||
}
|
||||
|
||||
sm2, err := secondNode.MpoolPushMessage(ctx, msg2, nil)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, i, sm2.Message.Nonce)
|
||||
sms = append(sms, sm2)
|
||||
}
|
||||
|
||||
ens.BeginMining(blockTime)
|
||||
|
||||
kit.CircuitBreaker(t, "push & mine messages", mPoolThrottle, mPoolTimeout, func() bool {
|
||||
pending1, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
pending2, err := secondNode.MpoolPending(context.TODO(), types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
if len(pending1) == 0 && len(pending2) == 0 {
|
||||
// Check messages on both nodes
|
||||
for _, lookMsg := range sms {
|
||||
msgLookup1, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, msgLookup1)
|
||||
|
||||
msgLookup2, err := secondNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, msgLookup2)
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func TestMemPoolClearPending(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_001, @CHAIN_MEMPOOL_PENDING_001
|
||||
//stm: @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CLEAR_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
|
||||
ctx := context.Background()
|
||||
const blockTime = 100 * time.Millisecond
|
||||
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
|
||||
ens.InterconnectAll()
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
sender := firstNode.DefaultKey.Address
|
||||
|
||||
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
|
||||
|
||||
const totalMessages = 10
|
||||
|
||||
bal, err := firstNode.WalletBalance(ctx, sender)
|
||||
require.NoError(t, err)
|
||||
toSend := big.Div(bal, big.NewInt(10))
|
||||
each := big.Div(toSend, big.NewInt(totalMessages))
|
||||
|
||||
// Add single message, then clear the pool
|
||||
msg := &types.Message{
|
||||
From: sender,
|
||||
To: addr,
|
||||
Value: each,
|
||||
}
|
||||
_, err = firstNode.MpoolPushMessage(ctx, msg, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// message should be in the mempool
|
||||
kit.CircuitBreaker(t, "push message", mPoolThrottle, mPoolTimeout, func() bool {
|
||||
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
return len(pending) == 1
|
||||
})
|
||||
|
||||
err = firstNode.MpoolClear(ctx, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
// pool should be empty now
|
||||
kit.CircuitBreaker(t, "clear mempool", mPoolThrottle, mPoolTimeout, func() bool {
|
||||
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
return len(pending) == 0
|
||||
})
|
||||
|
||||
// mine a couple of blocks
|
||||
ens.BeginMining(blockTime)
|
||||
time.Sleep(5 * blockTime)
|
||||
|
||||
// make sure that the cleared message wasn't picked up and mined
|
||||
_, err = firstNode.StateWaitMsg(ctx, msg.Cid(), 3, api.LookbackNoLimit, true)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestMemPoolBatchPush(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
|
||||
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001
|
||||
//stm: @CHAIN_MEMPOOL_BATCH_PUSH_001
|
||||
ctx := context.Background()
|
||||
const blockTime = 100 * time.Millisecond
|
||||
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
|
||||
ens.InterconnectAll()
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
sender := firstNode.DefaultKey.Address
|
||||
|
||||
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
|
||||
|
||||
const totalMessages = 10
|
||||
|
||||
bal, err := firstNode.WalletBalance(ctx, sender)
|
||||
require.NoError(t, err)
|
||||
toSend := big.Div(bal, big.NewInt(10))
|
||||
each := big.Div(toSend, big.NewInt(totalMessages))
|
||||
|
||||
// add messages to be mined/published
|
||||
var sms []*types.SignedMessage
|
||||
for i := 0; i < totalMessages; i++ {
|
||||
msg := &types.Message{
|
||||
From: sender,
|
||||
To: addr,
|
||||
Value: each,
|
||||
Nonce: uint64(i),
|
||||
GasLimit: 50_000_000,
|
||||
GasFeeCap: types.NewInt(100_000_000),
|
||||
GasPremium: types.NewInt(1),
|
||||
}
|
||||
|
||||
signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg)
|
||||
require.NoError(t, err)
|
||||
|
||||
sms = append(sms, signedMessage)
|
||||
}
|
||||
|
||||
_, err = firstNode.MpoolBatchPush(ctx, sms)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check pending messages for address
|
||||
kit.CircuitBreaker(t, "batch push", mPoolThrottle, mPoolTimeout, func() bool {
|
||||
msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender)
|
||||
require.NoError(t, err)
|
||||
|
||||
if len(msgStatuses) == totalMessages {
|
||||
for _, msgStatusList := range msgStatuses {
|
||||
for _, status := range msgStatusList {
|
||||
require.True(t, status.OK)
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
// verify messages should be the ones included in the next block
|
||||
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
|
||||
require.NoError(t, err)
|
||||
for _, msg := range sms {
|
||||
found := false
|
||||
for _, selectedMsg := range selected {
|
||||
if selectedMsg.Cid() == msg.Cid() {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, found)
|
||||
}
|
||||
|
||||
ens.BeginMining(blockTime)
|
||||
|
||||
kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool {
|
||||
// pool pending list should be empty
|
||||
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
if len(pending) == 0 {
|
||||
// all messages should be added to the chain
|
||||
for _, lookMsg := range sms {
|
||||
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, msgLookup)
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func TestMemPoolPushSingleNodeUntrusted(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
|
||||
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001
|
||||
//stm: @CHAIN_MEMPOOL_PUSH_003
|
||||
ctx := context.Background()
|
||||
const blockTime = 100 * time.Millisecond
|
||||
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
|
||||
ens.InterconnectAll()
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
sender := firstNode.DefaultKey.Address
|
||||
|
||||
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
|
||||
|
||||
const totalMessages = 10
|
||||
|
||||
bal, err := firstNode.WalletBalance(ctx, sender)
|
||||
require.NoError(t, err)
|
||||
toSend := big.Div(bal, big.NewInt(10))
|
||||
each := big.Div(toSend, big.NewInt(totalMessages))
|
||||
|
||||
// add messages to be mined/published
|
||||
var sms []*types.SignedMessage
|
||||
for i := 0; i < totalMessages; i++ {
|
||||
msg := &types.Message{
|
||||
From: sender,
|
||||
To: addr,
|
||||
Value: each,
|
||||
Nonce: uint64(i),
|
||||
GasLimit: 50_000_000,
|
||||
GasFeeCap: types.NewInt(100_000_000),
|
||||
GasPremium: types.NewInt(1),
|
||||
}
|
||||
|
||||
signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg)
|
||||
require.NoError(t, err)
|
||||
|
||||
// push untrusted messages
|
||||
pushedCid, err := firstNode.MpoolPushUntrusted(ctx, signedMessage)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, msg.Cid(), pushedCid)
|
||||
|
||||
sms = append(sms, signedMessage)
|
||||
}
|
||||
|
||||
kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
|
||||
// check pending messages for address
|
||||
msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender)
|
||||
|
||||
if len(msgStatuses) == totalMessages {
|
||||
for _, msgStatusList := range msgStatuses {
|
||||
for _, status := range msgStatusList {
|
||||
require.True(t, status.OK)
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
// verify messages should be the ones included in the next block
|
||||
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
|
||||
for _, msg := range sms {
|
||||
found := false
|
||||
for _, selectedMsg := range selected {
|
||||
if selectedMsg.Cid() == msg.Cid() {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, found)
|
||||
}
|
||||
|
||||
ens.BeginMining(blockTime)
|
||||
|
||||
kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
|
||||
// pool pending list should be empty
|
||||
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
if len(pending) == 0 {
|
||||
// all messages should be added to the chain
|
||||
for _, lookMsg := range sms {
|
||||
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, msgLookup)
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestMemPoolBatchPushUntrusted(t *testing.T) {
|
||||
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
|
||||
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001
|
||||
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001
|
||||
//stm: @CHAIN_MEMPOOL_BATCH_PUSH_002
|
||||
ctx := context.Background()
|
||||
const blockTime = 100 * time.Millisecond
|
||||
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
|
||||
ens.InterconnectAll()
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
sender := firstNode.DefaultKey.Address
|
||||
|
||||
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
|
||||
|
||||
const totalMessages = 10
|
||||
|
||||
bal, err := firstNode.WalletBalance(ctx, sender)
|
||||
require.NoError(t, err)
|
||||
toSend := big.Div(bal, big.NewInt(10))
|
||||
each := big.Div(toSend, big.NewInt(totalMessages))
|
||||
|
||||
// add messages to be mined/published
|
||||
var sms []*types.SignedMessage
|
||||
for i := 0; i < totalMessages; i++ {
|
||||
msg := &types.Message{
|
||||
From: sender,
|
||||
To: addr,
|
||||
Value: each,
|
||||
Nonce: uint64(i),
|
||||
GasLimit: 50_000_000,
|
||||
GasFeeCap: types.NewInt(100_000_000),
|
||||
GasPremium: types.NewInt(1),
|
||||
}
|
||||
|
||||
signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg)
|
||||
require.NoError(t, err)
|
||||
|
||||
sms = append(sms, signedMessage)
|
||||
}
|
||||
|
||||
_, err = firstNode.MpoolBatchPushUntrusted(ctx, sms)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check pending messages for address, wait until they are all pushed
|
||||
kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
|
||||
msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender)
|
||||
require.NoError(t, err)
|
||||
|
||||
if len(msgStatuses) == totalMessages {
|
||||
for _, msgStatusList := range msgStatuses {
|
||||
for _, status := range msgStatusList {
|
||||
require.True(t, status.OK)
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
// verify messages should be the ones included in the next block
|
||||
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
|
||||
|
||||
for _, msg := range sms {
|
||||
found := false
|
||||
for _, selectedMsg := range selected {
|
||||
if selectedMsg.Cid() == msg.Cid() {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, found)
|
||||
}
|
||||
|
||||
ens.BeginMining(blockTime)
|
||||
|
||||
// wait until pending messages are mined, pool pending list should be empty
|
||||
kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
|
||||
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
if len(pending) == 0 {
|
||||
// all messages should be added to the chain
|
||||
for _, lookMsg := range sms {
|
||||
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, msgLookup)
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
}
|
@ -51,7 +51,7 @@ func TestPaymentChannelsAPI(t *testing.T) {
|
||||
Miner(&miner, &paymentCreator, kit.WithAllSubsystems()).
|
||||
Start().
|
||||
InterconnectAll()
|
||||
bms := ens.BeginMining(blockTime)
|
||||
bms := ens.BeginMiningMustPost(blockTime)
|
||||
bm := bms[0]
|
||||
|
||||
// send some funds to register the receiver
|
||||
|
@ -47,6 +47,12 @@ var (
|
||||
WorkerHostname, _ = tag.NewKey("worker_hostname")
|
||||
StorageID, _ = tag.NewKey("storage_id")
|
||||
SectorState, _ = tag.NewKey("sector_state")
|
||||
|
||||
// rcmgr
|
||||
ServiceID, _ = tag.NewKey("svc")
|
||||
ProtocolID, _ = tag.NewKey("proto")
|
||||
Direction, _ = tag.NewKey("direction")
|
||||
UseFD, _ = tag.NewKey("use_fd")
|
||||
)
|
||||
|
||||
// Measures
|
||||
@ -143,6 +149,22 @@ var (
|
||||
SplitstoreCompactionHot = stats.Int64("splitstore/hot", "Number of hot blocks in last compaction", stats.UnitDimensionless)
|
||||
SplitstoreCompactionCold = stats.Int64("splitstore/cold", "Number of cold blocks in last compaction", stats.UnitDimensionless)
|
||||
SplitstoreCompactionDead = stats.Int64("splitstore/dead", "Number of dead blocks in last compaction", stats.UnitDimensionless)
|
||||
|
||||
// rcmgr
|
||||
RcmgrAllowConn = stats.Int64("rcmgr/allow_conn", "Number of allowed connections", stats.UnitDimensionless)
|
||||
RcmgrBlockConn = stats.Int64("rcmgr/block_conn", "Number of blocked connections", stats.UnitDimensionless)
|
||||
RcmgrAllowStream = stats.Int64("rcmgr/allow_stream", "Number of allowed streams", stats.UnitDimensionless)
|
||||
RcmgrBlockStream = stats.Int64("rcmgr/block_stream", "Number of blocked streams", stats.UnitDimensionless)
|
||||
RcmgrAllowPeer = stats.Int64("rcmgr/allow_peer", "Number of allowed peer connections", stats.UnitDimensionless)
|
||||
RcmgrBlockPeer = stats.Int64("rcmgr/block_peer", "Number of blocked peer connections", stats.UnitDimensionless)
|
||||
RcmgrAllowProto = stats.Int64("rcmgr/allow_proto", "Number of allowed streams attached to a protocol", stats.UnitDimensionless)
|
||||
RcmgrBlockProto = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol", stats.UnitDimensionless)
|
||||
RcmgrBlockProtoPeer = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol for a specific peer", stats.UnitDimensionless)
|
||||
RcmgrAllowSvc = stats.Int64("rcmgr/allow_svc", "Number of allowed streams attached to a service", stats.UnitDimensionless)
|
||||
RcmgrBlockSvc = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service", stats.UnitDimensionless)
|
||||
RcmgrBlockSvcPeer = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service for a specific peer", stats.UnitDimensionless)
|
||||
RcmgrAllowMem = stats.Int64("rcmgr/allow_mem", "Number of allowed memory reservations", stats.UnitDimensionless)
|
||||
RcmgrBlockMem = stats.Int64("rcmgr/block_mem", "Number of blocked memory reservations", stats.UnitDimensionless)
|
||||
)
|
||||
|
||||
var (
|
||||
@ -496,6 +518,76 @@ var (
|
||||
Measure: GraphsyncSendingPeersPending,
|
||||
Aggregation: view.LastValue(),
|
||||
}
|
||||
|
||||
// rcmgr
|
||||
RcmgrAllowConnView = &view.View{
|
||||
Measure: RcmgrAllowConn,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{Direction, UseFD},
|
||||
}
|
||||
RcmgrBlockConnView = &view.View{
|
||||
Measure: RcmgrBlockConn,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{Direction, UseFD},
|
||||
}
|
||||
RcmgrAllowStreamView = &view.View{
|
||||
Measure: RcmgrAllowStream,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{PeerID, Direction},
|
||||
}
|
||||
RcmgrBlockStreamView = &view.View{
|
||||
Measure: RcmgrBlockStream,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{PeerID, Direction},
|
||||
}
|
||||
RcmgrAllowPeerView = &view.View{
|
||||
Measure: RcmgrAllowPeer,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{PeerID},
|
||||
}
|
||||
RcmgrBlockPeerView = &view.View{
|
||||
Measure: RcmgrBlockPeer,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{PeerID},
|
||||
}
|
||||
RcmgrAllowProtoView = &view.View{
|
||||
Measure: RcmgrAllowProto,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{ProtocolID},
|
||||
}
|
||||
RcmgrBlockProtoView = &view.View{
|
||||
Measure: RcmgrBlockProto,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{ProtocolID},
|
||||
}
|
||||
RcmgrBlockProtoPeerView = &view.View{
|
||||
Measure: RcmgrBlockProtoPeer,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{ProtocolID, PeerID},
|
||||
}
|
||||
RcmgrAllowSvcView = &view.View{
|
||||
Measure: RcmgrAllowSvc,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{ServiceID},
|
||||
}
|
||||
RcmgrBlockSvcView = &view.View{
|
||||
Measure: RcmgrBlockSvc,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{ServiceID},
|
||||
}
|
||||
RcmgrBlockSvcPeerView = &view.View{
|
||||
Measure: RcmgrBlockSvcPeer,
|
||||
Aggregation: view.Count(),
|
||||
TagKeys: []tag.Key{ServiceID, PeerID},
|
||||
}
|
||||
RcmgrAllowMemView = &view.View{
|
||||
Measure: RcmgrAllowMem,
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
RcmgrBlockMemView = &view.View{
|
||||
Measure: RcmgrBlockMem,
|
||||
Aggregation: view.Count(),
|
||||
}
|
||||
)
|
||||
|
||||
// DefaultViews is an array of OpenCensus views for metric gathering purposes
|
||||
@ -517,6 +609,21 @@ var DefaultViews = func() []*view.View {
|
||||
GraphsyncSendingTotalMemoryAllocatedView,
|
||||
GraphsyncSendingTotalPendingAllocationsView,
|
||||
GraphsyncSendingPeersPendingView,
|
||||
|
||||
RcmgrAllowConnView,
|
||||
RcmgrBlockConnView,
|
||||
RcmgrAllowStreamView,
|
||||
RcmgrBlockStreamView,
|
||||
RcmgrAllowPeerView,
|
||||
RcmgrBlockPeerView,
|
||||
RcmgrAllowProtoView,
|
||||
RcmgrBlockProtoView,
|
||||
RcmgrBlockProtoPeerView,
|
||||
RcmgrAllowSvcView,
|
||||
RcmgrBlockSvcView,
|
||||
RcmgrBlockSvcPeerView,
|
||||
RcmgrAllowMemView,
|
||||
RcmgrBlockMemView,
|
||||
}
|
||||
views = append(views, blockstore.DefaultViews...)
|
||||
views = append(views, rpcmetrics.DefaultViews...)
|
||||
|
@ -139,6 +139,7 @@ func DefaultStorageMiner() *StorageMiner {
|
||||
AllowUnseal: true,
|
||||
AllowReplicaUpdate: true,
|
||||
AllowProveReplicaUpdate2: true,
|
||||
AllowRegenSectorKey: true,
|
||||
|
||||
// Default to 10 - tcp should still be able to figure this out, and
|
||||
// it's the ratio between 10gbit / 1gbit
|
||||
|
@ -11,9 +11,15 @@ import (
|
||||
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
|
||||
|
||||
"github.com/filecoin-project/lotus/metrics"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/tag"
|
||||
)
|
||||
|
||||
func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
|
||||
@ -43,6 +49,8 @@ func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceMan
|
||||
// TODO: also set appropriate default limits for lotus protocols
|
||||
libp2p.SetDefaultServiceLimits(limiter)
|
||||
|
||||
opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{}))
|
||||
|
||||
if os.Getenv("LOTUS_DEBUG_RCMGR") != "" {
|
||||
debugPath := filepath.Join(repoPath, "debug")
|
||||
if err := os.MkdirAll(debugPath, 0755); err != nil {
|
||||
@ -70,3 +78,109 @@ func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts {
|
||||
Opts: []libp2p.Option{libp2p.ResourceManager(mgr)},
|
||||
}
|
||||
}
|
||||
|
||||
type rcmgrMetrics struct{}
|
||||
|
||||
func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) {
|
||||
ctx := context.Background()
|
||||
if dir == network.DirInbound {
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
|
||||
} else {
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
|
||||
}
|
||||
if usefd {
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true"))
|
||||
} else {
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false"))
|
||||
}
|
||||
stats.Record(ctx, metrics.RcmgrAllowConn.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) {
|
||||
ctx := context.Background()
|
||||
if dir == network.DirInbound {
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
|
||||
} else {
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
|
||||
}
|
||||
if usefd {
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true"))
|
||||
} else {
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false"))
|
||||
}
|
||||
stats.Record(ctx, metrics.RcmgrBlockConn.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) {
|
||||
ctx := context.Background()
|
||||
if dir == network.DirInbound {
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
|
||||
} else {
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
|
||||
}
|
||||
stats.Record(ctx, metrics.RcmgrAllowStream.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) {
|
||||
ctx := context.Background()
|
||||
if dir == network.DirInbound {
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
|
||||
} else {
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
|
||||
}
|
||||
stats.Record(ctx, metrics.RcmgrBlockStream.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) AllowPeer(p peer.ID) {
|
||||
ctx := context.Background()
|
||||
stats.Record(ctx, metrics.RcmgrAllowPeer.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) BlockPeer(p peer.ID) {
|
||||
ctx := context.Background()
|
||||
stats.Record(ctx, metrics.RcmgrBlockPeer.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) {
|
||||
ctx := context.Background()
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
|
||||
stats.Record(ctx, metrics.RcmgrAllowProto.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) {
|
||||
ctx := context.Background()
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
|
||||
stats.Record(ctx, metrics.RcmgrBlockProto.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) {
|
||||
ctx := context.Background()
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
|
||||
stats.Record(ctx, metrics.RcmgrBlockProtoPeer.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) AllowService(svc string) {
|
||||
ctx := context.Background()
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
|
||||
stats.Record(ctx, metrics.RcmgrAllowSvc.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) BlockService(svc string) {
|
||||
ctx := context.Background()
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
|
||||
stats.Record(ctx, metrics.RcmgrBlockSvc.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) {
|
||||
ctx := context.Background()
|
||||
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
|
||||
stats.Record(ctx, metrics.RcmgrBlockSvcPeer.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) AllowMemory(size int) {
|
||||
stats.Record(context.Background(), metrics.RcmgrAllowMem.M(1))
|
||||
}
|
||||
|
||||
func (r rcmgrMetrics) BlockMemory(size int) {
|
||||
stats.Record(context.Background(), metrics.RcmgrBlockMem.M(1))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user