diff --git a/.circleci/config.yml b/.circleci/config.yml index 1614daf8e..f0262e5b6 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -850,6 +850,11 @@ workflows: suite: itest-get_messages_in_ts target: "./itests/get_messages_in_ts_test.go" + - test: + name: test-itest-mempool + suite: itest-mempool + target: "./itests/mempool_test.go" + - test: name: test-itest-multisig suite: itest-multisig diff --git a/Makefile b/Makefile index f7b13cc18..f91e74e33 100644 --- a/Makefile +++ b/Makefile @@ -345,6 +345,8 @@ gen: actors-gen type-gen method-gen cfgdoc-gen docsgen api-gen circleci @echo ">>> IF YOU'VE MODIFIED THE CLI OR CONFIG, REMEMBER TO ALSO MAKE docsgen-cli" .PHONY: gen +jen: gen + snap: lotus lotus-miner lotus-worker snapcraft # snapcraft upload ./lotus_*.snap diff --git a/blockstore/context.go b/blockstore/context.go new file mode 100644 index 000000000..ebb6fafe3 --- /dev/null +++ b/blockstore/context.go @@ -0,0 +1,21 @@ +package blockstore + +import ( + "context" +) + +type hotViewKey struct{} + +var hotView = hotViewKey{} + +// WithHotView constructs a new context with an option that provides a hint to the blockstore +// (e.g. the splitstore) that the object (and its ipld references) should be kept hot. +func WithHotView(ctx context.Context) context.Context { + return context.WithValue(ctx, hotView, struct{}{}) +} + +// IsHotView returns true if the hot view option is set in the context +func IsHotView(ctx context.Context) bool { + v := ctx.Value(hotView) + return v != nil +} diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 6a65e01df..a351df76a 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -161,6 +161,13 @@ type SplitStore struct { txnSyncCond sync.Cond txnSync bool + // background cold object reification + reifyWorkers sync.WaitGroup + reifyMx sync.Mutex + reifyCond sync.Cond + reifyPend map[cid.Cid]struct{} + reifyInProgress map[cid.Cid]struct{} + // registered protectors protectors []func(func(cid.Cid) error) error } @@ -202,6 +209,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co ss.txnSyncCond.L = &ss.txnSyncMx ss.ctx, ss.cancel = context.WithCancel(context.Background()) + ss.reifyCond.L = &ss.reifyMx + ss.reifyPend = make(map[cid.Cid]struct{}) + ss.reifyInProgress = make(map[cid.Cid]struct{}) + if enableDebugLog { ss.debug, err = openDebugLog(path) if err != nil { @@ -264,7 +275,13 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) { return true, nil } - return s.cold.Has(ctx, cid) + has, err = s.cold.Has(ctx, cid) + if has && bstore.IsHotView(ctx) { + s.reifyColdObject(cid) + } + + return has, err + } func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) { @@ -308,8 +325,11 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) blk, err = s.cold.Get(ctx, cid) if err == nil { - stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) + if bstore.IsHotView(ctx) { + s.reifyColdObject(cid) + } + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return blk, err @@ -359,6 +379,10 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { size, err = s.cold.GetSize(ctx, cid) if err == nil { + if bstore.IsHotView(ctx) { + s.reifyColdObject(cid) + } + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return size, err @@ -536,6 +560,10 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro err = s.cold.View(ctx, cid, cb) if err == nil { + if bstore.IsHotView(ctx) { + s.reifyColdObject(cid) + } + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return err @@ -645,6 +673,9 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error } } + // spawn the reifier + go s.reifyOrchestrator() + // watch the chain chain.SubscribeHeadChanges(s.HeadChange) @@ -676,6 +707,8 @@ func (s *SplitStore) Close() error { } } + s.reifyCond.Broadcast() + s.reifyWorkers.Wait() s.cancel() return multierr.Combine(s.markSetEnv.Close(), s.debug.Close()) } diff --git a/blockstore/splitstore/splitstore_reify.go b/blockstore/splitstore/splitstore_reify.go new file mode 100644 index 000000000..652900747 --- /dev/null +++ b/blockstore/splitstore/splitstore_reify.go @@ -0,0 +1,203 @@ +package splitstore + +import ( + "runtime" + "sync/atomic" + + "golang.org/x/xerrors" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" +) + +var EnableReification = false + +func (s *SplitStore) reifyColdObject(c cid.Cid) { + if !EnableReification { + return + } + + if !s.isWarm() { + return + } + + if isUnitaryObject(c) { + return + } + + s.reifyMx.Lock() + defer s.reifyMx.Unlock() + + _, ok := s.reifyInProgress[c] + if ok { + return + } + + s.reifyPend[c] = struct{}{} + s.reifyCond.Broadcast() +} + +func (s *SplitStore) reifyOrchestrator() { + workers := runtime.NumCPU() / 4 + if workers < 2 { + workers = 2 + } + + workch := make(chan cid.Cid, workers) + defer close(workch) + + for i := 0; i < workers; i++ { + s.reifyWorkers.Add(1) + go s.reifyWorker(workch) + } + + for { + s.reifyMx.Lock() + for len(s.reifyPend) == 0 && atomic.LoadInt32(&s.closing) == 0 { + s.reifyCond.Wait() + } + + if atomic.LoadInt32(&s.closing) != 0 { + s.reifyMx.Unlock() + return + } + + reifyPend := s.reifyPend + s.reifyPend = make(map[cid.Cid]struct{}) + s.reifyMx.Unlock() + + for c := range reifyPend { + select { + case workch <- c: + case <-s.ctx.Done(): + return + } + } + } +} + +func (s *SplitStore) reifyWorker(workch chan cid.Cid) { + defer s.reifyWorkers.Done() + for c := range workch { + s.doReify(c) + } +} + +func (s *SplitStore) doReify(c cid.Cid) { + var toreify, totrack, toforget []cid.Cid + + defer func() { + s.reifyMx.Lock() + defer s.reifyMx.Unlock() + + for _, c := range toreify { + delete(s.reifyInProgress, c) + } + for _, c := range totrack { + delete(s.reifyInProgress, c) + } + for _, c := range toforget { + delete(s.reifyInProgress, c) + } + }() + + s.txnLk.RLock() + defer s.txnLk.RUnlock() + + err := s.walkObjectIncomplete(c, newTmpVisitor(), + func(c cid.Cid) error { + if isUnitaryObject(c) { + return errStopWalk + } + + s.reifyMx.Lock() + _, inProgress := s.reifyInProgress[c] + if !inProgress { + s.reifyInProgress[c] = struct{}{} + } + s.reifyMx.Unlock() + + if inProgress { + return errStopWalk + } + + has, err := s.hot.Has(s.ctx, c) + if err != nil { + return xerrors.Errorf("error checking hotstore: %w", err) + } + + if has { + if s.txnMarkSet != nil { + hasMark, err := s.txnMarkSet.Has(c) + if err != nil { + log.Warnf("error checking markset: %s", err) + } else if hasMark { + toforget = append(toforget, c) + return errStopWalk + } + } else { + totrack = append(totrack, c) + return errStopWalk + } + } + + toreify = append(toreify, c) + return nil + }, + func(missing cid.Cid) error { + log.Warnf("missing reference while reifying %s: %s", c, missing) + return errStopWalk + }) + + if err != nil { + log.Warnf("error walking cold object for reification (cid: %s): %s", c, err) + return + } + + log.Debugf("reifying %d objects rooted at %s", len(toreify), c) + + // this should not get too big, maybe some 100s of objects. + batch := make([]blocks.Block, 0, len(toreify)) + for _, c := range toreify { + blk, err := s.cold.Get(s.ctx, c) + if err != nil { + log.Warnf("error retrieving cold object for reification (cid: %s): %s", c, err) + continue + } + + if err := s.checkClosing(); err != nil { + return + } + + batch = append(batch, blk) + } + + if len(batch) > 0 { + err = s.hot.PutMany(s.ctx, batch) + if err != nil { + log.Warnf("error reifying cold object (cid: %s): %s", c, err) + return + } + } + + if s.txnMarkSet != nil { + if len(toreify) > 0 { + if err := s.txnMarkSet.MarkMany(toreify); err != nil { + log.Warnf("error marking reified objects: %s", err) + } + } + if len(totrack) > 0 { + if err := s.txnMarkSet.MarkMany(totrack); err != nil { + log.Warnf("error marking tracked objects: %s", err) + } + } + } else { + // if txnActive is false these are noops + if len(toreify) > 0 { + s.trackTxnRefMany(toreify) + } + if len(totrack) > 0 { + s.trackTxnRefMany(totrack) + } + } +} diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go index 27d58bf10..c7f9cb6fc 100644 --- a/blockstore/splitstore/splitstore_test.go +++ b/blockstore/splitstore/splitstore_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/ioutil" + "math/rand" "os" "sync" "sync/atomic" @@ -387,6 +388,136 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) { } } +func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + hot := newMockStore() + cold := newMockStore() + + mkRandomBlock := func() blocks.Block { + data := make([]byte, 128) + _, err := rand.Read(data) + if err != nil { + t.Fatal(err) + } + + return blocks.NewBlock(data) + } + + block1 := mkRandomBlock() + block2 := mkRandomBlock() + block3 := mkRandomBlock() + + hdr := mock.MkBlock(nil, 0, 0) + hdr.Messages = block1.Cid() + hdr.ParentMessageReceipts = block2.Cid() + hdr.ParentStateRoot = block3.Cid() + block4, err := hdr.ToStorageBlock() + if err != nil { + t.Fatal(err) + } + + allBlocks := []blocks.Block{block1, block2, block3, block4} + for _, blk := range allBlocks { + err := cold.Put(context.Background(), blk) + if err != nil { + t.Fatal(err) + } + } + + path, err := ioutil.TempDir("", "splitstore.*") + if err != nil { + t.Fatal(err) + } + + t.Cleanup(func() { + _ = os.RemoveAll(path) + }) + + ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"}) + if err != nil { + t.Fatal(err) + } + defer ss.Close() //nolint + + ss.warmupEpoch = 1 + go ss.reifyOrchestrator() + + waitForReification := func() { + for { + ss.reifyMx.Lock() + ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0 + ss.reifyMx.Unlock() + + if ready { + return + } + + time.Sleep(time.Millisecond) + } + } + + // first access using the standard view + err = f(context.Background(), ss, block4.Cid()) + if err != nil { + t.Fatal(err) + } + + // nothing should be reified + waitForReification() + for _, blk := range allBlocks { + has, err := hot.Has(context.Background(), blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if has { + t.Fatal("block unexpectedly reified") + } + } + + // now make the hot/reifying view and ensure access reifies + err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid()) + if err != nil { + t.Fatal(err) + } + + // everything should be reified + waitForReification() + for i, blk := range allBlocks { + has, err := hot.Has(context.Background(), blk.Cid()) + if err != nil { + t.Fatal(err) + } + + if !has { + t.Fatalf("block%d was not reified", i+1) + } + } +} + +func TestSplitStoreReification(t *testing.T) { + EnableReification = true + t.Log("test reification with Has") + testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + _, err := s.Has(ctx, c) + return err + }) + t.Log("test reification with Get") + testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + _, err := s.Get(ctx, c) + return err + }) + t.Log("test reification with GetSize") + testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + _, err := s.GetSize(ctx, c) + return err + }) + t.Log("test reification with View") + testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error { + return s.View(ctx, c, func(_ []byte) error { return nil }) + }) +} + type mockChain struct { t testing.TB diff --git a/chain/consensus/filcns/compute_state.go b/chain/consensus/filcns/compute_state.go index 28bdc0eb3..929ac9f2f 100644 --- a/chain/consensus/filcns/compute_state.go +++ b/chain/consensus/filcns/compute_state.go @@ -33,6 +33,7 @@ import ( /* inline-gen end */ + "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin" @@ -93,6 +94,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, sm *stmgr.StateManager partDone() }() + ctx = blockstore.WithHotView(ctx) makeVmWithBaseStateAndEpoch := func(base cid.Cid, e abi.ChainEpoch) (vm.VMI, error) { filVested, err := sm.GetFilVested(ctx, e) if err != nil { diff --git a/chain/consensus/filcns/upgrades.go b/chain/consensus/filcns/upgrades.go index 2fa020d3d..116684b9f 100644 --- a/chain/consensus/filcns/upgrades.go +++ b/chain/consensus/filcns/upgrades.go @@ -165,13 +165,8 @@ func DefaultUpgradeSchedule() stmgr.UpgradeSchedule { Migration: UpgradeActorsV7, PreMigrations: []stmgr.PreMigration{{ PreMigration: PreUpgradeActorsV7, - StartWithin: 120, + StartWithin: 180, DontStartWithin: 60, - StopWithin: 35, - }, { - PreMigration: PreUpgradeActorsV7, - StartWithin: 30, - DontStartWithin: 15, StopWithin: 5, }}, Expensive: true, @@ -1264,7 +1259,7 @@ func upgradeActorsV7Common( root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet, config nv15.Config, ) (cid.Cid, error) { - writeStore := blockstore.NewAutobatch(ctx, sm.ChainStore().StateBlockstore(), units.GiB) + writeStore := blockstore.NewAutobatch(ctx, sm.ChainStore().StateBlockstore(), units.GiB/4) // TODO: pretty sure we'd achieve nothing by doing this, confirm in review //buf := blockstore.NewTieredBstore(sm.ChainStore().StateBlockstore(), writeStore) store := store.ActorStore(ctx, writeStore) diff --git a/chain/messagepool/check_test.go b/chain/messagepool/check_test.go new file mode 100644 index 000000000..ffcac74e5 --- /dev/null +++ b/chain/messagepool/check_test.go @@ -0,0 +1,224 @@ +//stm: #unit +package messagepool + +import ( + "context" + "fmt" + "testing" + + "github.com/ipfs/go-datastore" + logging "github.com/ipfs/go-log/v2" + "github.com/stretchr/testify/assert" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/consensus/filcns" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/mock" + "github.com/filecoin-project/lotus/chain/wallet" + _ "github.com/filecoin-project/lotus/lib/sigs/bls" + _ "github.com/filecoin-project/lotus/lib/sigs/secp" +) + +func init() { + _ = logging.SetLogLevel("*", "INFO") +} + +func getCheckMessageStatus(statusCode api.CheckStatusCode, msgStatuses []api.MessageCheckStatus) (*api.MessageCheckStatus, error) { + for i := 0; i < len(msgStatuses); i++ { + iMsgStatuses := msgStatuses[i] + if iMsgStatuses.CheckStatus.Code == statusCode { + return &iMsgStatuses, nil + } + } + return nil, fmt.Errorf("Could not find CheckStatusCode %s", statusCode) +} + +func TestCheckMessages(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CHECK_MESSAGES_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + if err != nil { + t.Fatal(err) + } + + sender, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + tma.setBalance(sender, 1000e15) + target := mock.Address(1001) + + var protos []*api.MessagePrototype + for i := 0; i < 5; i++ { + msg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: uint64(i), + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 2<<10), + } + proto := &api.MessagePrototype{ + Message: *msg, + ValidNonce: true, + } + protos = append(protos, proto) + } + + messageStatuses, err := mp.CheckMessages(context.TODO(), protos) + assert.NoError(t, err) + for i := 0; i < len(messageStatuses); i++ { + iMsgStatuses := messageStatuses[i] + for j := 0; j < len(iMsgStatuses); j++ { + jStatus := iMsgStatuses[i] + assert.True(t, jStatus.OK) + } + } +} + +func TestCheckPendingMessages(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + if err != nil { + t.Fatal(err) + } + + sender, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + tma.setBalance(sender, 1000e15) + target := mock.Address(1001) + + // add a valid message to the pool + msg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 2<<10), + } + + sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + mustAdd(t, mp, sm) + + messageStatuses, err := mp.CheckPendingMessages(context.TODO(), sender) + assert.NoError(t, err) + for i := 0; i < len(messageStatuses); i++ { + iMsgStatuses := messageStatuses[i] + for j := 0; j < len(iMsgStatuses); j++ { + jStatus := iMsgStatuses[i] + assert.True(t, jStatus.OK) + } + } +} + +func TestCheckReplaceMessages(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CHECK_REPLACE_MESSAGES_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + if err != nil { + t.Fatal(err) + } + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + if err != nil { + t.Fatal(err) + } + + sender, err := w.WalletNew(context.Background(), types.KTSecp256k1) + if err != nil { + t.Fatal(err) + } + + tma.setBalance(sender, 1000e15) + target := mock.Address(1001) + + // add a valid message to the pool + msg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 2<<10), + } + + sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + mustAdd(t, mp, sm) + + // create a new message with the same data, except that it is too big + var msgs []*types.Message + invalidmsg := &types.Message{ + To: target, + From: sender, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 128<<10), + } + msgs = append(msgs, invalidmsg) + + { + messageStatuses, err := mp.CheckReplaceMessages(context.TODO(), msgs) + if err != nil { + t.Fatal(err) + } + for i := 0; i < len(messageStatuses); i++ { + iMsgStatuses := messageStatuses[i] + + status, err := getCheckMessageStatus(api.CheckStatusMessageSize, iMsgStatuses) + if err != nil { + t.Fatal(err) + } + // the replacement message should cause a status error + assert.False(t, status.OK) + } + } + +} diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 6bd60da34..d7f075aab 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" @@ -226,6 +227,8 @@ func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) { } func TestMessagePool(t *testing.T) { + //stm: @CHAIN_MEMPOOL_GET_NONCE_001 + tma := newTestMpoolAPI() w, err := wallet.NewWallet(wallet.NewMemKeyStore()) @@ -327,6 +330,7 @@ func TestCheckMessageBig(t *testing.T) { Message: *msg, Signature: *sig, } + //stm: @CHAIN_MEMPOOL_PUSH_001 err = mp.Add(context.TODO(), sm) assert.ErrorIs(t, err, ErrMessageTooBig) } @@ -760,3 +764,302 @@ func TestUpdates(t *testing.T) { t.Fatal("expected closed channel, but got an update instead") } } + +func TestMessageBelowMinGasFee(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + // fee is just below minimum gas fee + fee := minimumBaseFee.Uint64() - 1 + { + msg := &types.Message{ + To: to, + From: from, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(fee), + GasPremium: types.NewInt(1), + Params: make([]byte, 32<<10), + } + + sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + err = mp.Add(context.TODO(), sm) + assert.ErrorIs(t, err, ErrGasFeeCapTooLow) + } +} + +func TestMessageValueTooHigh(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + totalFil := types.TotalFilecoinInt + extra := types.NewInt(1) + + value := types.BigAdd(totalFil, extra) + { + msg := &types.Message{ + To: to, + From: from, + Value: value, + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 32<<10), + } + + sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{}) + if err != nil { + panic(err) + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *sig, + } + + err = mp.Add(context.TODO(), sm) + assert.Error(t, err) + } +} + +func TestMessageSignatureInvalid(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + msg := &types.Message{ + To: to, + From: from, + Value: types.NewInt(1), + Nonce: 0, + GasLimit: 50000000, + GasFeeCap: types.NewInt(minimumBaseFee.Uint64()), + GasPremium: types.NewInt(1), + Params: make([]byte, 32<<10), + } + + badSig := &crypto.Signature{ + Type: crypto.SigTypeSecp256k1, + Data: make([]byte, 0), + } + sm := &types.SignedMessage{ + Message: *msg, + Signature: *badSig, + } + err = mp.Add(context.TODO(), sm) + assert.Error(t, err) + // assert.Contains(t, err.Error(), "invalid signature length") + assert.Error(t, err) + } +} + +func TestAddMessageTwice(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + // create a valid messages + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // try to add it twice + err = mp.Add(context.TODO(), sm) + // assert.Contains(t, err.Error(), "with nonce 0 already in mpool") + assert.Error(t, err) + } +} + +func TestAddMessageTwiceNonceGap(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + // create message with invalid nonce (1) + sm := makeTestMessage(w, from, to, 1, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // then try to add message again + err = mp.Add(context.TODO(), sm) + // assert.Contains(t, err.Error(), "unfulfilled nonce gap") + assert.Error(t, err) + } +} + +func TestAddMessageTwiceCidDiff(t *testing.T) { + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // Create message with different data, so CID is different + sm2 := makeTestMessage(w, from, to, 0, 50_000_001, minimumBaseFee.Uint64()) + + //stm: @CHAIN_MEMPOOL_PUSH_001 + // then try to add message again + err = mp.Add(context.TODO(), sm2) + // assert.Contains(t, err.Error(), "replace by fee has too low GasPremium") + assert.Error(t, err) + } +} + +func TestAddMessageTwiceCidDiffReplaced(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + // Create message with different data, so CID is different + sm2 := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()*2) + mustAdd(t, mp, sm2) + } +} + +func TestRemoveMessage(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001 + tma := newTestMpoolAPI() + + w, err := wallet.NewWallet(wallet.NewMemKeyStore()) + assert.NoError(t, err) + + from, err := w.WalletNew(context.Background(), types.KTBLS) + assert.NoError(t, err) + + tma.setBalance(from, 1000e9) + + ds := datastore.NewMapDatastore() + + mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil) + assert.NoError(t, err) + + to := mock.Address(1001) + + { + sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()) + mustAdd(t, mp, sm) + + //stm: @CHAIN_MEMPOOL_REMOVE_001 + // remove message for sender + mp.Remove(context.TODO(), from, sm.Message.Nonce, true) + + //stm: @CHAIN_MEMPOOL_PENDING_FOR_001 + // check messages in pool: should be none present + msgs := mp.pendingFor(context.TODO(), from) + assert.Len(t, msgs, 0) + } +} diff --git a/chain/messagepool/repub_test.go b/chain/messagepool/repub_test.go index de32eaa6b..18a75d881 100644 --- a/chain/messagepool/repub_test.go +++ b/chain/messagepool/repub_test.go @@ -1,3 +1,4 @@ +//stm: #unit package messagepool import ( @@ -16,6 +17,7 @@ import ( ) func TestRepubMessages(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001 oldRepublishBatchDelay := RepublishBatchDelay RepublishBatchDelay = time.Microsecond defer func() { @@ -57,6 +59,7 @@ func TestRepubMessages(t *testing.T) { for i := 0; i < 10; i++ { m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1)) + //stm: @CHAIN_MEMPOOL_PUSH_001 _, err := mp.Push(context.TODO(), m) if err != nil { t.Fatal(err) diff --git a/chain/messagepool/selection_test.go b/chain/messagepool/selection_test.go index 2ae99cd77..e97d5208e 100644 --- a/chain/messagepool/selection_test.go +++ b/chain/messagepool/selection_test.go @@ -1,3 +1,4 @@ +//stm: #unit package messagepool import ( @@ -74,6 +75,8 @@ func makeTestMpool() (*MessagePool, *testMpoolAPI) { } func TestMessageChains(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001 + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001 mp, tma := makeTestMpool() // the actors @@ -310,6 +313,8 @@ func TestMessageChains(t *testing.T) { } func TestMessageChainSkipping(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001 + // regression test for chain skip bug mp, tma := makeTestMpool() @@ -382,6 +387,7 @@ func TestMessageChainSkipping(t *testing.T) { } func TestBasicMessageSelection(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 oldMaxNonceGap := MaxNonceGap MaxNonceGap = 1000 defer func() { @@ -532,6 +538,7 @@ func TestBasicMessageSelection(t *testing.T) { } func TestMessageSelectionTrimmingGas(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -595,6 +602,7 @@ func TestMessageSelectionTrimmingGas(t *testing.T) { } func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -641,6 +649,7 @@ func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) { } func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -707,6 +716,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) { } func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -788,6 +798,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) { } func TestPriorityMessageSelection(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -867,6 +878,7 @@ func TestPriorityMessageSelection(t *testing.T) { } func TestPriorityMessageSelection2(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -934,6 +946,7 @@ func TestPriorityMessageSelection2(t *testing.T) { } func TestPriorityMessageSelection3(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 mp, tma := makeTestMpool() // the actors @@ -1028,6 +1041,8 @@ func TestPriorityMessageSelection3(t *testing.T) { } func TestOptimalMessageSelection1(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + // this test uses just a single actor sending messages with a low tq // the chain depenent merging algorithm should pick messages from the actor // from the start @@ -1094,6 +1109,8 @@ func TestOptimalMessageSelection1(t *testing.T) { } func TestOptimalMessageSelection2(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + // this test uses two actors sending messages to each other, with the first // actor paying (much) higher gas premium than the second. // We select with a low ticket quality; the chain depenent merging algorithm should pick @@ -1173,6 +1190,8 @@ func TestOptimalMessageSelection2(t *testing.T) { } func TestOptimalMessageSelection3(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + // this test uses 10 actors sending a block of messages to each other, with the the first // actors paying higher gas premium than the subsequent actors. // We select with a low ticket quality; the chain dependent merging algorithm should pick @@ -1416,6 +1435,8 @@ func makeZipfPremiumDistribution(rng *rand.Rand) func() uint64 { } func TestCompetitiveMessageSelectionExp(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + if testing.Short() { t.Skip("skipping in short mode") } @@ -1439,6 +1460,8 @@ func TestCompetitiveMessageSelectionExp(t *testing.T) { } func TestCompetitiveMessageSelectionZipf(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001 + if testing.Short() { t.Skip("skipping in short mode") } @@ -1462,6 +1485,7 @@ func TestCompetitiveMessageSelectionZipf(t *testing.T) { } func TestGasReward(t *testing.T) { + //stm: @CHAIN_MEMPOOL_GET_GAS_REWARD_001 tests := []struct { Premium uint64 FeeCap uint64 @@ -1494,6 +1518,8 @@ func TestGasReward(t *testing.T) { } func TestRealWorldSelection(t *testing.T) { + //stm: @TOKEN_WALLET_NEW_001, @TOKEN_WALLET_SIGN_001, @CHAIN_MEMPOOL_SELECT_001 + // load test-messages.json.gz and rewrite the messages so that // 1) we map each real actor to a test actor so that we can sign the messages // 2) adjust the nonces so that they start from 0 diff --git a/cmd/lotus-miner/info.go b/cmd/lotus-miner/info.go index 1133908ca..46070ca96 100644 --- a/cmd/lotus-miner/info.go +++ b/cmd/lotus-miner/info.go @@ -126,7 +126,7 @@ func infoCmdAct(cctx *cli.Context) error { alerts, err := minerApi.LogAlerts(ctx) if err != nil { - return xerrors.Errorf("getting alerts: %w", err) + fmt.Printf("ERROR: getting alerts: %s\n", err) } activeAlerts := make([]alerting.Alert, 0) diff --git a/cmd/lotus-miner/info_all.go b/cmd/lotus-miner/info_all.go index bd1147b22..fa5a413e7 100644 --- a/cmd/lotus-miner/info_all.go +++ b/cmd/lotus-miner/info_all.go @@ -96,6 +96,11 @@ var infoAllCmd = &cli.Command{ fmt.Println("ERROR: ", err) } + fmt.Println("\n#: Storage Locks") + if err := storageLocks.Action(cctx); err != nil { + fmt.Println("ERROR: ", err) + } + fmt.Println("\n#: Sched Diag") if err := sealingSchedDiagCmd.Action(cctx); err != nil { fmt.Println("ERROR: ", err) @@ -192,6 +197,11 @@ var infoAllCmd = &cli.Command{ fmt.Println("ERROR: ", err) } + fmt.Println("\n#: Storage Sector List") + if err := storageListSectorsCmd.Action(cctx); err != nil { + fmt.Println("ERROR: ", err) + } + fmt.Println("\n#: Expired Sectors") if err := sectorsExpiredCmd.Action(cctx); err != nil { fmt.Println("ERROR: ", err) diff --git a/cmd/lotus-miner/init.go b/cmd/lotus-miner/init.go index ae742c663..59ea75b10 100644 --- a/cmd/lotus-miner/init.go +++ b/cmd/lotus-miner/init.go @@ -467,12 +467,15 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{}) smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{ - ParallelFetchLimit: 10, - AllowAddPiece: true, - AllowPreCommit1: true, - AllowPreCommit2: true, - AllowCommit: true, - AllowUnseal: true, + ParallelFetchLimit: 10, + AllowAddPiece: true, + AllowPreCommit1: true, + AllowPreCommit2: true, + AllowCommit: true, + AllowUnseal: true, + AllowReplicaUpdate: true, + AllowProveReplicaUpdate2: true, + AllowRegenSectorKey: true, }, wsts, smsts) if err != nil { return err diff --git a/cmd/lotus-miner/sectors.go b/cmd/lotus-miner/sectors.go index c779f5a8b..d8c3e9c7c 100644 --- a/cmd/lotus-miner/sectors.go +++ b/cmd/lotus-miner/sectors.go @@ -161,7 +161,7 @@ var sectorsStatusCmd = &cli.Command{ fmt.Printf("Expiration:\t\t%v\n", status.Expiration) fmt.Printf("DealWeight:\t\t%v\n", status.DealWeight) fmt.Printf("VerifiedDealWeight:\t\t%v\n", status.VerifiedDealWeight) - fmt.Printf("InitialPledge:\t\t%v\n", status.InitialPledge) + fmt.Printf("InitialPledge:\t\t%v\n", types.FIL(status.InitialPledge)) fmt.Printf("\nExpiration Info\n") fmt.Printf("OnTime:\t\t%v\n", status.OnTime) fmt.Printf("Early:\t\t%v\n", status.Early) @@ -294,8 +294,14 @@ var sectorsListCmd = &cli.Command{ Aliases: []string{"e"}, }, &cli.BoolFlag{ - Name: "seal-time", - Usage: "display how long it took for the sector to be sealed", + Name: "initial-pledge", + Usage: "display initial pledge", + Aliases: []string{"p"}, + }, + &cli.BoolFlag{ + Name: "seal-time", + Usage: "display how long it took for the sector to be sealed", + Aliases: []string{"t"}, }, &cli.StringFlag{ Name: "states", @@ -405,6 +411,7 @@ var sectorsListCmd = &cli.Command{ tablewriter.Col("Deals"), tablewriter.Col("DealWeight"), tablewriter.Col("VerifiedPower"), + tablewriter.Col("Pledge"), tablewriter.NewLineCol("Error"), tablewriter.NewLineCol("RecoveryTimeout")) @@ -483,6 +490,9 @@ var sectorsListCmd = &cli.Command{ m["RecoveryTimeout"] = color.YellowString(lcli.EpochTime(head.Height(), st.Early)) } } + if inSSet && cctx.Bool("initial-pledge") { + m["Pledge"] = types.FIL(st.InitialPledge).Short() + } } if !fast && deals > 0 { diff --git a/cmd/lotus-miner/storage.go b/cmd/lotus-miner/storage.go index 6f7a627f6..0fea2a3a5 100644 --- a/cmd/lotus-miner/storage.go +++ b/cmd/lotus-miner/storage.go @@ -368,6 +368,7 @@ type storedSector struct { store stores.SectorStorageInfo unsealed, sealed, cache bool + update, updatecache bool } var storageFindCmd = &cli.Command{ @@ -421,6 +422,16 @@ var storageFindCmd = &cli.Command{ return xerrors.Errorf("finding cache: %w", err) } + us, err := nodeApi.StorageFindSector(ctx, sid, storiface.FTUpdate, 0, false) + if err != nil { + return xerrors.Errorf("finding sealed: %w", err) + } + + uc, err := nodeApi.StorageFindSector(ctx, sid, storiface.FTUpdateCache, 0, false) + if err != nil { + return xerrors.Errorf("finding cache: %w", err) + } + byId := map[stores.ID]*storedSector{} for _, info := range u { sts, ok := byId[info.ID] @@ -455,6 +466,28 @@ var storageFindCmd = &cli.Command{ } sts.cache = true } + for _, info := range us { + sts, ok := byId[info.ID] + if !ok { + sts = &storedSector{ + id: info.ID, + store: info, + } + byId[info.ID] = sts + } + sts.update = true + } + for _, info := range uc { + sts, ok := byId[info.ID] + if !ok { + sts = &storedSector{ + id: info.ID, + store: info, + } + byId[info.ID] = sts + } + sts.updatecache = true + } local, err := nodeApi.StorageLocal(ctx) if err != nil { @@ -480,6 +513,12 @@ var storageFindCmd = &cli.Command{ if info.cache { types += "Cache, " } + if info.update { + types += "Update, " + } + if info.updatecache { + types += "UpdateCache, " + } fmt.Printf("In %s (%s)\n", info.id, types[:len(types)-2]) fmt.Printf("\tSealing: %t; Storage: %t\n", info.store.CanSeal, info.store.CanStore) diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 84ff1ccdd..9e6843dbf 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -173,6 +173,11 @@ var runCmd = &cli.Command{ Usage: "enable prove replica update 2", Value: true, }, + &cli.BoolFlag{ + Name: "regen-sector-key", + Usage: "enable regen sector key", + Value: true, + }, &cli.IntFlag{ Name: "parallel-fetch-limit", Usage: "maximum fetch operations to run in parallel", @@ -278,12 +283,15 @@ var runCmd = &cli.Command{ if cctx.Bool("commit") { taskTypes = append(taskTypes, sealtasks.TTCommit2) } - if cctx.Bool("replicaupdate") { + if cctx.Bool("replica-update") { taskTypes = append(taskTypes, sealtasks.TTReplicaUpdate) } if cctx.Bool("prove-replica-update2") { taskTypes = append(taskTypes, sealtasks.TTProveReplicaUpdate2) } + if cctx.Bool("regen-sector-key") { + taskTypes = append(taskTypes, sealtasks.TTRegenSectorKey) + } if len(taskTypes) == 0 { return xerrors.Errorf("no task types specified") diff --git a/cmd/lotus-seal-worker/tasks.go b/cmd/lotus-seal-worker/tasks.go index 02e5d6cfd..52133d09d 100644 --- a/cmd/lotus-seal-worker/tasks.go +++ b/cmd/lotus-seal-worker/tasks.go @@ -22,11 +22,14 @@ var tasksCmd = &cli.Command{ } var allowSetting = map[sealtasks.TaskType]struct{}{ - sealtasks.TTAddPiece: {}, - sealtasks.TTPreCommit1: {}, - sealtasks.TTPreCommit2: {}, - sealtasks.TTCommit2: {}, - sealtasks.TTUnseal: {}, + sealtasks.TTAddPiece: {}, + sealtasks.TTPreCommit1: {}, + sealtasks.TTPreCommit2: {}, + sealtasks.TTCommit2: {}, + sealtasks.TTUnseal: {}, + sealtasks.TTReplicaUpdate: {}, + sealtasks.TTProveReplicaUpdate2: {}, + sealtasks.TTRegenSectorKey: {}, } var settableStr = func() string { diff --git a/cmd/lotus-shed/diff.go b/cmd/lotus-shed/diff.go new file mode 100644 index 000000000..bcaa04122 --- /dev/null +++ b/cmd/lotus-shed/diff.go @@ -0,0 +1,104 @@ +package main + +import ( + "fmt" + + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/chain/types" + lcli "github.com/filecoin-project/lotus/cli" +) + +var diffCmd = &cli.Command{ + Name: "diff", + Usage: "diff state objects", + Subcommands: []*cli.Command{diffStateTrees}, +} + +var diffStateTrees = &cli.Command{ + Name: "state-trees", + Usage: "diff two state-trees", + ArgsUsage: " ", + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + + defer closer() + ctx := lcli.ReqContext(cctx) + + if cctx.NArg() != 2 { + return xerrors.Errorf("expected two state-tree roots") + } + + argA := cctx.Args().Get(1) + rootA, err := cid.Parse(argA) + if err != nil { + return xerrors.Errorf("first state-tree root (%q) is not a CID: %w", argA, err) + } + argB := cctx.Args().Get(1) + rootB, err := cid.Parse(argB) + if err != nil { + return xerrors.Errorf("second state-tree root (%q) is not a CID: %w", argB, err) + } + + if rootA == rootB { + fmt.Println("state trees do not differ") + return nil + } + + changedB, err := api.StateChangedActors(ctx, rootA, rootB) + if err != nil { + return err + } + changedA, err := api.StateChangedActors(ctx, rootB, rootA) + if err != nil { + return err + } + + diff := func(stateA, stateB types.Actor) { + if stateB.Code != stateA.Code { + fmt.Printf(" code: %s != %s\n", stateA.Code, stateB.Code) + } + if stateB.Head != stateA.Head { + fmt.Printf(" state: %s != %s\n", stateA.Head, stateB.Head) + } + if stateB.Nonce != stateA.Nonce { + fmt.Printf(" nonce: %d != %d\n", stateA.Nonce, stateB.Nonce) + } + if !stateB.Balance.Equals(stateA.Balance) { + fmt.Printf(" balance: %s != %s\n", stateA.Balance, stateB.Balance) + } + } + + fmt.Printf("state differences between %s (first) and %s (second):\n\n", rootA, rootB) + for addr, stateA := range changedA { + fmt.Println(addr) + stateB, ok := changedB[addr] + if ok { + diff(stateA, stateB) + continue + } else { + fmt.Printf(" actor does not exist in second state-tree (%s)\n", rootB) + } + fmt.Println() + delete(changedB, addr) + } + for addr, stateB := range changedB { + fmt.Println(addr) + stateA, ok := changedA[addr] + if ok { + diff(stateA, stateB) + continue + } else { + fmt.Printf(" actor does not exist in first state-tree (%s)\n", rootA) + } + fmt.Println() + } + return nil + }, +} diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index 9bcea7224..45fd24e18 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -68,6 +68,7 @@ func main() { sendCsvCmd, terminationsCmd, migrationsCmd, + diffCmd, } app := &cli.App{ diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index 1fd3e91a3..848a9c864 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -1621,14 +1621,15 @@ USAGE: lotus-miner sectors list [command options] [arguments...] OPTIONS: - --show-removed, -r show removed sectors (default: false) - --color, -c use color in display output (default: depends on output being a TTY) - --fast, -f don't show on-chain info for better performance (default: false) - --events, -e display number of events the sector has received (default: false) - --seal-time display how long it took for the sector to be sealed (default: false) - --states value filter sectors by a comma-separated list of states - --unproven, -u only show sectors which aren't in the 'Proving' state (default: false) - --help, -h show help (default: false) + --show-removed, -r show removed sectors (default: false) + --color, -c use color in display output (default: depends on output being a TTY) + --fast, -f don't show on-chain info for better performance (default: false) + --events, -e display number of events the sector has received (default: false) + --initial-pledge, -p display initial pledge (default: false) + --seal-time, -t display how long it took for the sector to be sealed (default: false) + --states value filter sectors by a comma-separated list of states + --unproven, -u only show sectors which aren't in the 'Proving' state (default: false) + --help, -h show help (default: false) ``` diff --git a/documentation/en/cli-lotus-worker.md b/documentation/en/cli-lotus-worker.md index c74d8e8da..e610cff62 100644 --- a/documentation/en/cli-lotus-worker.md +++ b/documentation/en/cli-lotus-worker.md @@ -46,6 +46,7 @@ OPTIONS: --commit enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap) (default: true) --replica-update enable replica update (default: true) --prove-replica-update2 enable prove replica update 2 (default: true) + --regen-sector-key enable regen sector key (default: true) --parallel-fetch-limit value maximum fetch operations to run in parallel (default: 5) --timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m") --help, -h show help (default: false) @@ -170,7 +171,7 @@ NAME: lotus-worker tasks enable - Enable a task type USAGE: - lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|AP] + lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK] OPTIONS: --help, -h show help (default: false) @@ -183,7 +184,7 @@ NAME: lotus-worker tasks disable - Disable a task type USAGE: - lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|AP] + lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK] OPTIONS: --help, -h show help (default: false) diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index d8c774c75..818f0b73c 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -438,6 +438,9 @@ # env var: LOTUS_STORAGE_ALLOWPROVEREPLICAUPDATE2 #AllowProveReplicaUpdate2 = true + # env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY + #AllowRegenSectorKey = true + # env var: LOTUS_STORAGE_RESOURCEFILTERING #ResourceFiltering = "hardware" diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index fcbfa2e69..897ba4f06 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -105,6 +105,7 @@ type SealerConfig struct { AllowUnseal bool AllowReplicaUpdate bool AllowProveReplicaUpdate2 bool + AllowRegenSectorKey bool // ResourceFiltering instructs the system which resource filtering strategy // to use when evaluating tasks against this worker. An empty value defaults @@ -169,6 +170,9 @@ func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls store if sc.AllowProveReplicaUpdate2 { localTasks = append(localTasks, sealtasks.TTProveReplicaUpdate2) } + if sc.AllowRegenSectorKey { + localTasks = append(localTasks, sealtasks.TTRegenSectorKey) + } wcfg := WorkerConfig{ IgnoreResourceFiltering: sc.ResourceFiltering == ResourceFilteringDisabled, diff --git a/extern/storage-sealing/checks.go b/extern/storage-sealing/checks.go index 56b0677c4..dc045ded2 100644 --- a/extern/storage-sealing/checks.go +++ b/extern/storage-sealing/checks.go @@ -214,8 +214,13 @@ func checkReplicaUpdate(ctx context.Context, maddr address.Address, si SectorInf if err != nil { return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)} } - if si.UpdateUnsealed == nil || !commD.Equals(*si.UpdateUnsealed) { - return &ErrBadRU{xerrors.Errorf("on chain CommD differs from sector: %s != %s", commD, si.CommD)} + + if si.UpdateUnsealed == nil { + return &ErrBadRU{xerrors.New("nil UpdateUnsealed cid after replica update")} + } + + if !commD.Equals(*si.UpdateUnsealed) { + return &ErrBadRU{xerrors.Errorf("calculated CommD differs from updated replica: %s != %s", commD, *si.UpdateUnsealed)} } if si.UpdateSealed == nil { diff --git a/go.mod b/go.mod index 43d7853e1..f5d5b8be5 100644 --- a/go.mod +++ b/go.mod @@ -118,7 +118,7 @@ require ( github.com/libp2p/go-libp2p-pubsub v0.6.1 github.com/libp2p/go-libp2p-quic-transport v0.16.1 github.com/libp2p/go-libp2p-record v0.1.3 - github.com/libp2p/go-libp2p-resource-manager v0.1.3 + github.com/libp2p/go-libp2p-resource-manager v0.1.4 github.com/libp2p/go-libp2p-routing-helpers v0.2.3 github.com/libp2p/go-libp2p-swarm v0.10.1 github.com/libp2p/go-libp2p-tls v0.3.1 diff --git a/go.sum b/go.sum index ddffca61d..e6d057fe6 100644 --- a/go.sum +++ b/go.sum @@ -1156,8 +1156,9 @@ github.com/libp2p/go-libp2p-record v0.1.2/go.mod h1:pal0eNcT5nqZaTV7UGhqeGqxFgGd github.com/libp2p/go-libp2p-record v0.1.3 h1:R27hoScIhQf/A8XJZ8lYpnqh9LatJ5YbHs28kCIfql0= github.com/libp2p/go-libp2p-record v0.1.3/go.mod h1:yNUff/adKIfPnYQXgp6FQmNu3gLJ6EMg7+/vv2+9pY4= github.com/libp2p/go-libp2p-resource-manager v0.1.0/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y= -github.com/libp2p/go-libp2p-resource-manager v0.1.3 h1:Umf0tW6WNXSb6Uoma0YT56azB5iikL/aeGAP7s7+f5o= github.com/libp2p/go-libp2p-resource-manager v0.1.3/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y= +github.com/libp2p/go-libp2p-resource-manager v0.1.4 h1:RcxMD0pytOUimx3BqTVs6IqItb3H5Qg44SD7XyT68lw= +github.com/libp2p/go-libp2p-resource-manager v0.1.4/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y= github.com/libp2p/go-libp2p-routing v0.0.1/go.mod h1:N51q3yTr4Zdr7V8Jt2JIktVU+3xBBylx1MZeVA6t1Ys= github.com/libp2p/go-libp2p-routing v0.1.0/go.mod h1:zfLhI1RI8RLEzmEaaPwzonRvXeeSHddONWkcTcB54nE= github.com/libp2p/go-libp2p-routing-helpers v0.2.3 h1:xY61alxJ6PurSi+MXbywZpelvuU4U4p/gPTxjqCqTzY= diff --git a/itests/kit/blockminer.go b/itests/kit/blockminer.go index 91ddc2e26..a232d82e0 100644 --- a/itests/kit/blockminer.go +++ b/itests/kit/blockminer.go @@ -3,6 +3,7 @@ package kit import ( "bytes" "context" + "fmt" "sync" "sync/atomic" "testing" @@ -10,6 +11,7 @@ import ( "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/lotus/api" aminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" @@ -63,11 +65,10 @@ func (p *partitionTracker) done(t *testing.T) bool { return uint64(len(p.partitions)) == p.count(t) } -func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *types.SignedMessage) (ret bool) { +func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, msg *types.Message) (ret bool) { defer func() { ret = p.done(t) }() - msg := smsg.Message if !(msg.To == bm.miner.ActorAddr) { return } @@ -82,11 +83,69 @@ func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *type return } +func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *dline.Info) { + + tracker := newPartitionTracker(ctx, dlinfo.Index, bm) + if !tracker.done(bm.t) { // need to wait for post + bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t)) + poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) //subscribe before checking pending so we don't miss any events + require.NoError(bm.t, err) + + // First check pending messages we'll mine this epoch + msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK) + require.NoError(bm.t, err) + for _, msg := range msgs { + if tracker.recordIfPost(bm.t, bm, &msg.Message) { + fmt.Printf("found post in mempool pending\n") + } + } + + // Account for included but not yet executed messages + for _, bc := range ts.Cids() { + msgs, err := bm.miner.FullNode.ChainGetBlockMessages(ctx, bc) + require.NoError(bm.t, err) + for _, msg := range msgs.BlsMessages { + if tracker.recordIfPost(bm.t, bm, msg) { + fmt.Printf("found post in message of prev tipset\n") + } + + } + for _, msg := range msgs.SecpkMessages { + if tracker.recordIfPost(bm.t, bm, &msg.Message) { + fmt.Printf("found post in message of prev tipset\n") + } + } + } + + // post not yet in mpool, wait for it + if !tracker.done(bm.t) { + bm.t.Logf("post missing from mpool, block mining suspended until it arrives") + POOL: + for { + bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key()) + select { + case <-ctx.Done(): + return + case evt := <-poolEvts: + bm.t.Logf("pool event: %d", evt.Type) + if evt.Type == api.MpoolAdd { + bm.t.Logf("incoming message %v", evt.Message) + if tracker.recordIfPost(bm.t, bm, &evt.Message.Message) { + fmt.Printf("found post in mempool evt\n") + break POOL + } + } + } + } + bm.t.Logf("done waiting on mpool") + } + } +} + // Like MineBlocks but refuses to mine until the window post scheduler has wdpost messages in the mempool // and everything shuts down if a post fails. It also enforces that every block mined succeeds func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Duration) { - - time.Sleep(3 * time.Second) + time.Sleep(time.Second) // wrap context in a cancellable context. ctx, bm.cancel = context.WithCancel(ctx) @@ -94,8 +153,6 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur go func() { defer bm.wg.Done() - activeDeadlines := make(map[int]struct{}) - _ = activeDeadlines ts, err := bm.miner.FullNode.ChainHead(ctx) require.NoError(bm.t, err) wait := make(chan bool) @@ -103,7 +160,7 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur require.NoError(bm.t, err) // read current out curr := <-chg - require.Equal(bm.t, ts.Height(), curr[0].Val.Height()) + require.Equal(bm.t, ts.Height(), curr[0].Val.Height(), "failed sanity check: are multiple miners mining with must post?") for { select { case <-time.After(blocktime): @@ -111,52 +168,15 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur return } nulls := atomic.SwapInt64(&bm.nextNulls, 0) - require.Equal(bm.t, int64(0), nulls, "Injecting > 0 null blocks while `MustPost` mining is currently unsupported") // Wake up and figure out if we are at the end of an active deadline ts, err := bm.miner.FullNode.ChainHead(ctx) require.NoError(bm.t, err) - tsk := ts.Key() - dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, tsk) + dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, ts.Key()) require.NoError(bm.t, err) - if ts.Height()+1 == dlinfo.Last() { // Last epoch in dline, we need to check that miner has posted - - tracker := newPartitionTracker(ctx, dlinfo.Index, bm) - if !tracker.done(bm.t) { // need to wait for post - bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t)) - poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) - require.NoError(bm.t, err) - - // First check pending messages we'll mine this epoch - msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK) - require.NoError(bm.t, err) - for _, msg := range msgs { - tracker.recordIfPost(bm.t, bm, msg) - } - - // post not yet in mpool, wait for it - if !tracker.done(bm.t) { - bm.t.Logf("post missing from mpool, block mining suspended until it arrives") - POOL: - for { - bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key()) - select { - case <-ctx.Done(): - return - case evt := <-poolEvts: - bm.t.Logf("pool event: %d", evt.Type) - if evt.Type == api.MpoolAdd { - bm.t.Logf("incoming message %v", evt.Message) - if tracker.recordIfPost(bm.t, bm, evt.Message) { - break POOL - } - } - } - } - bm.t.Logf("done waiting on mpool") - } - } + if ts.Height()+1+abi.ChainEpoch(nulls) >= dlinfo.Last() { // Next block brings us past the last epoch in dline, we need to wait for miner to post + bm.forcePoSt(ctx, ts, dlinfo) } var target abi.ChainEpoch @@ -173,6 +193,12 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur Done: reportSuccessFn, }) success = <-wait + if !success { + // if we are mining a new null block and it brings us past deadline boundary we need to wait for miner to post + if ts.Height()+1+abi.ChainEpoch(nulls+i) >= dlinfo.Last() { + bm.forcePoSt(ctx, ts, dlinfo) + } + } } // Wait until it shows up on the given full nodes ChainHead diff --git a/itests/kit/circuit.go b/itests/kit/circuit.go new file mode 100644 index 000000000..d2857010e --- /dev/null +++ b/itests/kit/circuit.go @@ -0,0 +1,34 @@ +package kit + +import ( + "fmt" + "testing" + "time" +) + +/* +CircuitBreaker implements a simple time-based circuit breaker used for waiting for async operations to finish. + +This is how it works: + - It runs the `cb` function until it returns true, + - waiting for `throttle` duration between each iteration, + - or at most `timeout` duration until it breaks test execution. + +You can use it if t.Deadline() is not "granular" enough, and you want to know which specific piece of code timed out, +or you need to set different deadlines in the same test. +*/ +func CircuitBreaker(t *testing.T, label string, throttle, timeout time.Duration, cb func() bool) { + tmo := time.After(timeout) + for { + if cb() { + break + } + select { + case <-tmo: + t.Fatal("timeout: ", label) + default: + fmt.Printf("waiting: %s\n", label) + time.Sleep(throttle) + } + } +} diff --git a/itests/mempool_test.go b/itests/mempool_test.go new file mode 100644 index 000000000..a1c2a330e --- /dev/null +++ b/itests/mempool_test.go @@ -0,0 +1,521 @@ +//stm: #integration +package itests + +import ( + "context" + "testing" + "time" + + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/stretchr/testify/require" +) + +const mPoolThrottle = time.Millisecond * 100 +const mPoolTimeout = time.Second * 10 + +func TestMemPoolPushSingleNode(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_PUSH_002 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, err := firstNode.WalletNew(ctx, types.KTBLS) + require.NoError(t, err) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + } + + sm, err := firstNode.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm.Message.Nonce) + + sms = append(sms, sm) + } + + // check pending messages for address + kit.CircuitBreaker(t, "push messages", mPoolThrottle, mPoolTimeout, func() bool { + msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender) + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true + } + return false + }) + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + ens.BeginMining(blockTime) + + kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) +} + +func TestMemPoolPushTwoNodes(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_PUSH_002 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, secondNode, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + sender2 := secondNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + addr2, _ := secondNode.WalletNew(ctx, types.KTBLS) + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + + const totalMessages = 10 + + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + var sms []*types.SignedMessage + // push messages to message pools of both nodes + for i := 0; i < totalMessages; i++ { + // first + msg1 := &types.Message{ + From: sender, + To: addr, + Value: each, + } + + sm1, err := firstNode.MpoolPushMessage(ctx, msg1, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm1.Message.Nonce) + sms = append(sms, sm1) + + // second + msg2 := &types.Message{ + From: sender2, + To: addr2, + Value: each, + } + + sm2, err := secondNode.MpoolPushMessage(ctx, msg2, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm2.Message.Nonce) + sms = append(sms, sm2) + } + + ens.BeginMining(blockTime) + + kit.CircuitBreaker(t, "push & mine messages", mPoolThrottle, mPoolTimeout, func() bool { + pending1, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + pending2, err := secondNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending1) == 0 && len(pending2) == 0 { + // Check messages on both nodes + for _, lookMsg := range sms { + msgLookup1, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup1) + + msgLookup2, err := secondNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup2) + } + return true + } + return false + }) +} + +func TestMemPoolClearPending(t *testing.T) { + //stm: @CHAIN_MEMPOOL_PUSH_001, @CHAIN_MEMPOOL_PENDING_001 + //stm: @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CLEAR_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // Add single message, then clear the pool + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + } + _, err = firstNode.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + + // message should be in the mempool + kit.CircuitBreaker(t, "push message", mPoolThrottle, mPoolTimeout, func() bool { + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + return len(pending) == 1 + }) + + err = firstNode.MpoolClear(ctx, true) + require.NoError(t, err) + + // pool should be empty now + kit.CircuitBreaker(t, "clear mempool", mPoolThrottle, mPoolTimeout, func() bool { + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + return len(pending) == 0 + }) + + // mine a couple of blocks + ens.BeginMining(blockTime) + time.Sleep(5 * blockTime) + + // make sure that the cleared message wasn't picked up and mined + _, err = firstNode.StateWaitMsg(ctx, msg.Cid(), 3, api.LookbackNoLimit, true) + require.Error(t, err) +} + +func TestMemPoolBatchPush(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001 + //stm: @CHAIN_MEMPOOL_BATCH_PUSH_001 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + Nonce: uint64(i), + GasLimit: 50_000_000, + GasFeeCap: types.NewInt(100_000_000), + GasPremium: types.NewInt(1), + } + + signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg) + require.NoError(t, err) + + sms = append(sms, signedMessage) + } + + _, err = firstNode.MpoolBatchPush(ctx, sms) + require.NoError(t, err) + + // check pending messages for address + kit.CircuitBreaker(t, "batch push", mPoolThrottle, mPoolTimeout, func() bool { + msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) + require.NoError(t, err) + + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true + } + return false + }) + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + require.NoError(t, err) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + ens.BeginMining(blockTime) + + kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) +} + +func TestMemPoolPushSingleNodeUntrusted(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001 + //stm: @CHAIN_MEMPOOL_PUSH_003 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + Nonce: uint64(i), + GasLimit: 50_000_000, + GasFeeCap: types.NewInt(100_000_000), + GasPremium: types.NewInt(1), + } + + signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg) + require.NoError(t, err) + + // push untrusted messages + pushedCid, err := firstNode.MpoolPushUntrusted(ctx, signedMessage) + require.NoError(t, err) + require.Equal(t, msg.Cid(), pushedCid) + + sms = append(sms, signedMessage) + } + + kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + // check pending messages for address + msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender) + + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true + } + return false + }) + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + ens.BeginMining(blockTime) + + kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + // pool pending list should be empty + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) + +} + +func TestMemPoolBatchPushUntrusted(t *testing.T) { + //stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001 + //stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001 + //stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001 + //stm: @CHAIN_MEMPOOL_BATCH_PUSH_002 + ctx := context.Background() + const blockTime = 100 * time.Millisecond + firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs()) + ens.InterconnectAll() + kit.QuietMiningLogs() + + sender := firstNode.DefaultKey.Address + + addr, _ := firstNode.WalletNew(ctx, types.KTBLS) + + const totalMessages = 10 + + bal, err := firstNode.WalletBalance(ctx, sender) + require.NoError(t, err) + toSend := big.Div(bal, big.NewInt(10)) + each := big.Div(toSend, big.NewInt(totalMessages)) + + // add messages to be mined/published + var sms []*types.SignedMessage + for i := 0; i < totalMessages; i++ { + msg := &types.Message{ + From: sender, + To: addr, + Value: each, + Nonce: uint64(i), + GasLimit: 50_000_000, + GasFeeCap: types.NewInt(100_000_000), + GasPremium: types.NewInt(1), + } + + signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg) + require.NoError(t, err) + + sms = append(sms, signedMessage) + } + + _, err = firstNode.MpoolBatchPushUntrusted(ctx, sms) + require.NoError(t, err) + + // check pending messages for address, wait until they are all pushed + kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender) + require.NoError(t, err) + + if len(msgStatuses) == totalMessages { + for _, msgStatusList := range msgStatuses { + for _, status := range msgStatusList { + require.True(t, status.OK) + } + } + return true + } + return false + }) + + // verify messages should be the ones included in the next block + selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0) + + for _, msg := range sms { + found := false + for _, selectedMsg := range selected { + if selectedMsg.Cid() == msg.Cid() { + found = true + break + } + } + require.True(t, found) + } + + ens.BeginMining(blockTime) + + // wait until pending messages are mined, pool pending list should be empty + kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool { + pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK) + require.NoError(t, err) + + if len(pending) == 0 { + // all messages should be added to the chain + for _, lookMsg := range sms { + msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true) + require.NoError(t, err) + require.NotNil(t, msgLookup) + } + return true + } + return false + }) + +} diff --git a/itests/paych_api_test.go b/itests/paych_api_test.go index a07c499f9..7e135a9be 100644 --- a/itests/paych_api_test.go +++ b/itests/paych_api_test.go @@ -51,7 +51,7 @@ func TestPaymentChannelsAPI(t *testing.T) { Miner(&miner, &paymentCreator, kit.WithAllSubsystems()). Start(). InterconnectAll() - bms := ens.BeginMining(blockTime) + bms := ens.BeginMiningMustPost(blockTime) bm := bms[0] // send some funds to register the receiver diff --git a/metrics/metrics.go b/metrics/metrics.go index b4032bb1d..400e76537 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -47,6 +47,12 @@ var ( WorkerHostname, _ = tag.NewKey("worker_hostname") StorageID, _ = tag.NewKey("storage_id") SectorState, _ = tag.NewKey("sector_state") + + // rcmgr + ServiceID, _ = tag.NewKey("svc") + ProtocolID, _ = tag.NewKey("proto") + Direction, _ = tag.NewKey("direction") + UseFD, _ = tag.NewKey("use_fd") ) // Measures @@ -143,6 +149,22 @@ var ( SplitstoreCompactionHot = stats.Int64("splitstore/hot", "Number of hot blocks in last compaction", stats.UnitDimensionless) SplitstoreCompactionCold = stats.Int64("splitstore/cold", "Number of cold blocks in last compaction", stats.UnitDimensionless) SplitstoreCompactionDead = stats.Int64("splitstore/dead", "Number of dead blocks in last compaction", stats.UnitDimensionless) + + // rcmgr + RcmgrAllowConn = stats.Int64("rcmgr/allow_conn", "Number of allowed connections", stats.UnitDimensionless) + RcmgrBlockConn = stats.Int64("rcmgr/block_conn", "Number of blocked connections", stats.UnitDimensionless) + RcmgrAllowStream = stats.Int64("rcmgr/allow_stream", "Number of allowed streams", stats.UnitDimensionless) + RcmgrBlockStream = stats.Int64("rcmgr/block_stream", "Number of blocked streams", stats.UnitDimensionless) + RcmgrAllowPeer = stats.Int64("rcmgr/allow_peer", "Number of allowed peer connections", stats.UnitDimensionless) + RcmgrBlockPeer = stats.Int64("rcmgr/block_peer", "Number of blocked peer connections", stats.UnitDimensionless) + RcmgrAllowProto = stats.Int64("rcmgr/allow_proto", "Number of allowed streams attached to a protocol", stats.UnitDimensionless) + RcmgrBlockProto = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol", stats.UnitDimensionless) + RcmgrBlockProtoPeer = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol for a specific peer", stats.UnitDimensionless) + RcmgrAllowSvc = stats.Int64("rcmgr/allow_svc", "Number of allowed streams attached to a service", stats.UnitDimensionless) + RcmgrBlockSvc = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service", stats.UnitDimensionless) + RcmgrBlockSvcPeer = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service for a specific peer", stats.UnitDimensionless) + RcmgrAllowMem = stats.Int64("rcmgr/allow_mem", "Number of allowed memory reservations", stats.UnitDimensionless) + RcmgrBlockMem = stats.Int64("rcmgr/block_mem", "Number of blocked memory reservations", stats.UnitDimensionless) ) var ( @@ -496,6 +518,76 @@ var ( Measure: GraphsyncSendingPeersPending, Aggregation: view.LastValue(), } + + // rcmgr + RcmgrAllowConnView = &view.View{ + Measure: RcmgrAllowConn, + Aggregation: view.Count(), + TagKeys: []tag.Key{Direction, UseFD}, + } + RcmgrBlockConnView = &view.View{ + Measure: RcmgrBlockConn, + Aggregation: view.Count(), + TagKeys: []tag.Key{Direction, UseFD}, + } + RcmgrAllowStreamView = &view.View{ + Measure: RcmgrAllowStream, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID, Direction}, + } + RcmgrBlockStreamView = &view.View{ + Measure: RcmgrBlockStream, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID, Direction}, + } + RcmgrAllowPeerView = &view.View{ + Measure: RcmgrAllowPeer, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID}, + } + RcmgrBlockPeerView = &view.View{ + Measure: RcmgrBlockPeer, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID}, + } + RcmgrAllowProtoView = &view.View{ + Measure: RcmgrAllowProto, + Aggregation: view.Count(), + TagKeys: []tag.Key{ProtocolID}, + } + RcmgrBlockProtoView = &view.View{ + Measure: RcmgrBlockProto, + Aggregation: view.Count(), + TagKeys: []tag.Key{ProtocolID}, + } + RcmgrBlockProtoPeerView = &view.View{ + Measure: RcmgrBlockProtoPeer, + Aggregation: view.Count(), + TagKeys: []tag.Key{ProtocolID, PeerID}, + } + RcmgrAllowSvcView = &view.View{ + Measure: RcmgrAllowSvc, + Aggregation: view.Count(), + TagKeys: []tag.Key{ServiceID}, + } + RcmgrBlockSvcView = &view.View{ + Measure: RcmgrBlockSvc, + Aggregation: view.Count(), + TagKeys: []tag.Key{ServiceID}, + } + RcmgrBlockSvcPeerView = &view.View{ + Measure: RcmgrBlockSvcPeer, + Aggregation: view.Count(), + TagKeys: []tag.Key{ServiceID, PeerID}, + } + RcmgrAllowMemView = &view.View{ + Measure: RcmgrAllowMem, + Aggregation: view.Count(), + } + RcmgrBlockMemView = &view.View{ + Measure: RcmgrBlockMem, + Aggregation: view.Count(), + } ) // DefaultViews is an array of OpenCensus views for metric gathering purposes @@ -517,6 +609,21 @@ var DefaultViews = func() []*view.View { GraphsyncSendingTotalMemoryAllocatedView, GraphsyncSendingTotalPendingAllocationsView, GraphsyncSendingPeersPendingView, + + RcmgrAllowConnView, + RcmgrBlockConnView, + RcmgrAllowStreamView, + RcmgrBlockStreamView, + RcmgrAllowPeerView, + RcmgrBlockPeerView, + RcmgrAllowProtoView, + RcmgrBlockProtoView, + RcmgrBlockProtoPeerView, + RcmgrAllowSvcView, + RcmgrBlockSvcView, + RcmgrBlockSvcPeerView, + RcmgrAllowMemView, + RcmgrBlockMemView, } views = append(views, blockstore.DefaultViews...) views = append(views, rpcmetrics.DefaultViews...) diff --git a/node/config/def.go b/node/config/def.go index 157350866..aceeaadf5 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -139,6 +139,7 @@ func DefaultStorageMiner() *StorageMiner { AllowUnseal: true, AllowReplicaUpdate: true, AllowProveReplicaUpdate2: true, + AllowRegenSectorKey: true, // Default to 10 - tcp should still be able to figure this out, and // it's the ratio between 10gbit / 1gbit diff --git a/node/modules/lp2p/rcmgr.go b/node/modules/lp2p/rcmgr.go index 8b286ff5e..0bc4dd6b2 100644 --- a/node/modules/lp2p/rcmgr.go +++ b/node/modules/lp2p/rcmgr.go @@ -11,9 +11,15 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" rcmgr "github.com/libp2p/go-libp2p-resource-manager" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/repo" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" ) func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { @@ -43,6 +49,8 @@ func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceMan // TODO: also set appropriate default limits for lotus protocols libp2p.SetDefaultServiceLimits(limiter) + opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{})) + if os.Getenv("LOTUS_DEBUG_RCMGR") != "" { debugPath := filepath.Join(repoPath, "debug") if err := os.MkdirAll(debugPath, 0755); err != nil { @@ -70,3 +78,109 @@ func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts { Opts: []libp2p.Option{libp2p.ResourceManager(mgr)}, } } + +type rcmgrMetrics struct{} + +func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) { + ctx := context.Background() + if dir == network.DirInbound { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound")) + } + if usefd { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false")) + } + stats.Record(ctx, metrics.RcmgrAllowConn.M(1)) +} + +func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) { + ctx := context.Background() + if dir == network.DirInbound { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound")) + } + if usefd { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false")) + } + stats.Record(ctx, metrics.RcmgrBlockConn.M(1)) +} + +func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) { + ctx := context.Background() + if dir == network.DirInbound { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound")) + } + stats.Record(ctx, metrics.RcmgrAllowStream.M(1)) +} + +func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) { + ctx := context.Background() + if dir == network.DirInbound { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound")) + } else { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound")) + } + stats.Record(ctx, metrics.RcmgrBlockStream.M(1)) +} + +func (r rcmgrMetrics) AllowPeer(p peer.ID) { + ctx := context.Background() + stats.Record(ctx, metrics.RcmgrAllowPeer.M(1)) +} + +func (r rcmgrMetrics) BlockPeer(p peer.ID) { + ctx := context.Background() + stats.Record(ctx, metrics.RcmgrBlockPeer.M(1)) +} + +func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) + stats.Record(ctx, metrics.RcmgrAllowProto.M(1)) +} + +func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) + stats.Record(ctx, metrics.RcmgrBlockProto.M(1)) +} + +func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto))) + stats.Record(ctx, metrics.RcmgrBlockProtoPeer.M(1)) +} + +func (r rcmgrMetrics) AllowService(svc string) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) + stats.Record(ctx, metrics.RcmgrAllowSvc.M(1)) +} + +func (r rcmgrMetrics) BlockService(svc string) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) + stats.Record(ctx, metrics.RcmgrBlockSvc.M(1)) +} + +func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) { + ctx := context.Background() + ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc)) + stats.Record(ctx, metrics.RcmgrBlockSvcPeer.M(1)) +} + +func (r rcmgrMetrics) AllowMemory(size int) { + stats.Record(context.Background(), metrics.RcmgrAllowMem.M(1)) +} + +func (r rcmgrMetrics) BlockMemory(size int) { + stats.Record(context.Background(), metrics.RcmgrBlockMem.M(1)) +}