Merge branch 'master' into bloxico/cli-sync-tests

This commit is contained in:
Darko Brdareski 2022-02-22 14:03:44 +01:00
commit e753e7a66e
51 changed files with 2336 additions and 139 deletions

View File

@ -850,6 +850,11 @@ workflows:
suite: itest-get_messages_in_ts
target: "./itests/get_messages_in_ts_test.go"
- test:
name: test-itest-mempool
suite: itest-mempool
target: "./itests/mempool_test.go"
- test:
name: test-itest-multisig
suite: itest-multisig

View File

@ -1,5 +1,77 @@
# Lotus changelog
# 1.14.1 / 2022-02-18
This is an **optional** release of lotus, that fixes the incorrect *comment* of network v15 OhSnap upgrade **date**. Note the actual upgrade epoch in [v1.14.0](https://github.com/filecoin-project/lotus/releases/tag/v1.14.0) was correct.
# 1.14.0 / 2022-02-17
This is a MANDATORY release of Lotus that introduces [Filecoin network v15,
codenamed the OhSnap upgrade](https://github.com/filecoin-project/community/discussions/74?sort=new#discussioncomment-1922550).
The network is scheduled to upgrade to v15 on March 1st at 2022-03-01T15:00:00Z. All node operators, including storage providers, must upgrade to this release (or a later release) before that time. Storage providers must update their daemons, miners, and worker(s).
The OhSnap upgrade introduces the following FIPs, delivered in [actors v7](https://github.com/filecoin-project/specs-actors/releases/tag/v7.0.0):
- [FIP-0019 Snap Deals](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0019.md)
- [FIP-0028 Remove Datacap from Verified clients](https://github.com/filecoin-project/FIPs/pull/226)
It is recommended that storage providers download the new params before updating their node, miner, and workers. To do so:
- Download Lotus v1.14.0 or later
- run `make lotus-shed`
- run `./lotus-shed fetch-params` with the appropriate `proving-params` flag
- Upgrade the Lotus daemon and miner **when the previous step is complete**
All node operators, including storage providers, should be aware that a pre-migration will begin at 2022-03-01T13:30:00Z (150 minutes before the real upgrade). The pre-migration will take between 20 and 50 minutes, depending on hardware specs. During this time, expect slower block validation times, increased CPU and memory usage, and longer delays for API queries.
## New Features and Changes
- Integrate actor v7-rc1:
- Integrate v7 actors ([#7617](https://github.com/filecoin-project/lotus/pull/7617))
- feat: state: Fast migration for v15 ([#7933](https://github.com/filecoin-project/lotus/pull/7933))
- fix: blockstore: Add missing locks to autobatch::Get() [#7939](https://github.com/filecoin-project/lotus/pull/7939))
- correctness fixes for the autobatch blockstore ([#7940](https://github.com/filecoin-project/lotus/pull/7940))
- Implement and support [FIP-0019 Snap Deals](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0019.md)
- chore: deps: Integrate proof v11.0.0 ([#7923](https://github.com/filecoin-project/lotus/pull/7923))
- Snap Deals Lotus Integration: FSM Posting and integration test ([#7810](https://github.com/filecoin-project/lotus/pull/7810))
- Feat/sector storage unseal ([#7730](https://github.com/filecoin-project/lotus/pull/7730))
- Feat/snap deals storage ([#7615](https://github.com/filecoin-project/lotus/pull/7615))
- fix: sealing: Add more deal expiration checks during PRU pipeline ([#7871](https://github.com/filecoin-project/lotus/pull/7871))
- chore: deps: Update go-paramfetch ([#7917](https://github.com/filecoin-project/lotus/pull/7917))
- feat: #7880 gas: add gas charge for VerifyReplicaUpdate ([#7897](https://github.com/filecoin-project/lotus/pull/7897))
- enhancement: sectors: disable existing cc upgrade path 2 days before the upgrade epoch ([#7900](https://github.com/filecoin-project/lotus/pull/7900))
## Improvements
- updating to new datastore/blockstore code with contexts ([#7646](https://github.com/filecoin-project/lotus/pull/7646))
- reorder transfer checks so as to ensure sending 2B FIL to yourself fails if you don't have that amount ([#7637](https://github.com/filecoin-project/lotus/pull/7637))
- VM: Circ supply should be constant per epoch ([#7811](https://github.com/filecoin-project/lotus/pull/7811))
## Bug Fixes
- Fix: state: circsuypply calc around null blocks ([#7890](https://github.com/filecoin-project/lotus/pull/7890))
- Mempool msg selection should respect block message limits ([#7321](https://github.com/filecoin-project/lotus/pull/7321))
SplitStore: supress compaction near upgrades ([#7734](https://github.com/filecoin-project/lotus/pull/7734))
## Others
- chore: create pull_request_template.md ([#7726](https://github.com/filecoin-project/lotus/pull/7726))
## Contributors
| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Aayush Rajasekaran | 41 | +5538/-1205 | 189 |
| zenground0 | 11 | +3316/-524 | 124 |
| Jennifer Wang | 29 | +714/-599 | 68 |
| ZenGround0 | 3 | +263/-25 | 11 |
| c r | 2 | +198/-30 | 6 |
| vyzo | 4 | +189/-7 | 7 |
| Aayush | 11 | +146/-48 | 49 |
| web3-bot | 10 | +99/-17 | 10 |
| Steven Allen | 1 | +55/-37 | 1 |
| Jiaying Wang | 5 | +30/-8 | 5 |
| Jakub Sztandera | 2 | +8/-3 | 3 |
| Łukasz Magiera | 1 | +3/-3 | 2 |
| Travis Person | 1 | +2/-2 | 2 |
| Rod Vagg | 1 | +2/-2 | 2 |
# v1.13.2 / 2022-01-09
Lotus v1.13.2 is a *highly recommended* feature release with remarkable retrieval improvements, new features like

View File

@ -345,6 +345,8 @@ gen: actors-gen type-gen method-gen cfgdoc-gen docsgen api-gen circleci
@echo ">>> IF YOU'VE MODIFIED THE CLI OR CONFIG, REMEMBER TO ALSO MAKE docsgen-cli"
.PHONY: gen
jen: gen
snap: lotus lotus-miner lotus-worker
snapcraft
# snapcraft upload ./lotus_*.snap

View File

@ -45,8 +45,9 @@ type Gateway interface {
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
MpoolPush(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
MsigGetPending(context.Context, address.Address, types.TipSetKey) ([]*MsigTransaction, error)
MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
MsigGetVestingSchedule(ctx context.Context, addr address.Address, tsk types.TipSetKey) (MsigVesting, error)
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (DealCollateralBounds, error)
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)

View File

@ -516,6 +516,8 @@ type GatewayStruct struct {
MsigGetVested func(p0 context.Context, p1 address.Address, p2 types.TipSetKey, p3 types.TipSetKey) (types.BigInt, error) ``
MsigGetVestingSchedule func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (MsigVesting, error) ``
StateAccountKey func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) ``
StateDealProviderCollateralBounds func(p0 context.Context, p1 abi.PaddedPieceSize, p2 bool, p3 types.TipSetKey) (DealCollateralBounds, error) ``
@ -3285,6 +3287,17 @@ func (s *GatewayStub) MsigGetVested(p0 context.Context, p1 address.Address, p2 t
return *new(types.BigInt), ErrNotSupported
}
func (s *GatewayStruct) MsigGetVestingSchedule(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (MsigVesting, error) {
if s.Internal.MsigGetVestingSchedule == nil {
return *new(MsigVesting), ErrNotSupported
}
return s.Internal.MsigGetVestingSchedule(p0, p1, p2)
}
func (s *GatewayStub) MsigGetVestingSchedule(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (MsigVesting, error) {
return *new(MsigVesting), ErrNotSupported
}
func (s *GatewayStruct) StateAccountKey(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (address.Address, error) {
if s.Internal.StateAccountKey == nil {
return *new(address.Address), ErrNotSupported

21
blockstore/context.go Normal file
View File

@ -0,0 +1,21 @@
package blockstore
import (
"context"
)
type hotViewKey struct{}
var hotView = hotViewKey{}
// WithHotView constructs a new context with an option that provides a hint to the blockstore
// (e.g. the splitstore) that the object (and its ipld references) should be kept hot.
func WithHotView(ctx context.Context) context.Context {
return context.WithValue(ctx, hotView, struct{}{})
}
// IsHotView returns true if the hot view option is set in the context
func IsHotView(ctx context.Context) bool {
v := ctx.Value(hotView)
return v != nil
}

View File

@ -161,6 +161,13 @@ type SplitStore struct {
txnSyncCond sync.Cond
txnSync bool
// background cold object reification
reifyWorkers sync.WaitGroup
reifyMx sync.Mutex
reifyCond sync.Cond
reifyPend map[cid.Cid]struct{}
reifyInProgress map[cid.Cid]struct{}
// registered protectors
protectors []func(func(cid.Cid) error) error
}
@ -202,6 +209,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
ss.txnSyncCond.L = &ss.txnSyncMx
ss.ctx, ss.cancel = context.WithCancel(context.Background())
ss.reifyCond.L = &ss.reifyMx
ss.reifyPend = make(map[cid.Cid]struct{})
ss.reifyInProgress = make(map[cid.Cid]struct{})
if enableDebugLog {
ss.debug, err = openDebugLog(path)
if err != nil {
@ -264,7 +275,13 @@ func (s *SplitStore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
return true, nil
}
return s.cold.Has(ctx, cid)
has, err = s.cold.Has(ctx, cid)
if has && bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}
return has, err
}
func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
@ -308,8 +325,11 @@ func (s *SplitStore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error)
blk, err = s.cold.Get(ctx, cid)
if err == nil {
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return blk, err
@ -359,6 +379,10 @@ func (s *SplitStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
size, err = s.cold.GetSize(ctx, cid)
if err == nil {
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return size, err
@ -536,6 +560,10 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro
err = s.cold.View(ctx, cid, cb)
if err == nil {
if bstore.IsHotView(ctx) {
s.reifyColdObject(cid)
}
stats.Record(s.ctx, metrics.SplitstoreMiss.M(1))
}
return err
@ -645,6 +673,9 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error
}
}
// spawn the reifier
go s.reifyOrchestrator()
// watch the chain
chain.SubscribeHeadChanges(s.HeadChange)
@ -676,6 +707,8 @@ func (s *SplitStore) Close() error {
}
}
s.reifyCond.Broadcast()
s.reifyWorkers.Wait()
s.cancel()
return multierr.Combine(s.markSetEnv.Close(), s.debug.Close())
}

View File

@ -0,0 +1,214 @@
package splitstore
import (
"errors"
"runtime"
"sync/atomic"
"golang.org/x/xerrors"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
)
var (
errReifyLimit = errors.New("reification limit reached")
ReifyLimit = 16384
)
func (s *SplitStore) reifyColdObject(c cid.Cid) {
if !s.isWarm() {
return
}
if isUnitaryObject(c) {
return
}
s.reifyMx.Lock()
defer s.reifyMx.Unlock()
_, ok := s.reifyInProgress[c]
if ok {
return
}
s.reifyPend[c] = struct{}{}
s.reifyCond.Broadcast()
}
func (s *SplitStore) reifyOrchestrator() {
workers := runtime.NumCPU() / 4
if workers < 2 {
workers = 2
}
workch := make(chan cid.Cid, workers)
defer close(workch)
for i := 0; i < workers; i++ {
s.reifyWorkers.Add(1)
go s.reifyWorker(workch)
}
for {
s.reifyMx.Lock()
for len(s.reifyPend) == 0 && atomic.LoadInt32(&s.closing) == 0 {
s.reifyCond.Wait()
}
if atomic.LoadInt32(&s.closing) != 0 {
s.reifyMx.Unlock()
return
}
reifyPend := s.reifyPend
s.reifyPend = make(map[cid.Cid]struct{})
s.reifyMx.Unlock()
for c := range reifyPend {
select {
case workch <- c:
case <-s.ctx.Done():
return
}
}
}
}
func (s *SplitStore) reifyWorker(workch chan cid.Cid) {
defer s.reifyWorkers.Done()
for c := range workch {
s.doReify(c)
}
}
func (s *SplitStore) doReify(c cid.Cid) {
var toreify, totrack, toforget []cid.Cid
defer func() {
s.reifyMx.Lock()
defer s.reifyMx.Unlock()
for _, c := range toreify {
delete(s.reifyInProgress, c)
}
for _, c := range totrack {
delete(s.reifyInProgress, c)
}
for _, c := range toforget {
delete(s.reifyInProgress, c)
}
}()
s.txnLk.RLock()
defer s.txnLk.RUnlock()
count := 0
err := s.walkObjectIncomplete(c, newTmpVisitor(),
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
}
count++
if count > ReifyLimit {
return errReifyLimit
}
s.reifyMx.Lock()
_, inProgress := s.reifyInProgress[c]
if !inProgress {
s.reifyInProgress[c] = struct{}{}
}
s.reifyMx.Unlock()
if inProgress {
return errStopWalk
}
has, err := s.hot.Has(s.ctx, c)
if err != nil {
return xerrors.Errorf("error checking hotstore: %w", err)
}
if has {
if s.txnMarkSet != nil {
hasMark, err := s.txnMarkSet.Has(c)
if err != nil {
log.Warnf("error checking markset: %s", err)
} else if hasMark {
toforget = append(toforget, c)
return errStopWalk
}
} else {
totrack = append(totrack, c)
return errStopWalk
}
}
toreify = append(toreify, c)
return nil
},
func(missing cid.Cid) error {
log.Warnf("missing reference while reifying %s: %s", c, missing)
return errStopWalk
})
if err != nil {
if xerrors.Is(err, errReifyLimit) {
log.Debug("reification aborted; reify limit reached")
return
}
log.Warnf("error walking cold object for reification (cid: %s): %s", c, err)
return
}
log.Debugf("reifying %d objects rooted at %s", len(toreify), c)
// this should not get too big, maybe some 100s of objects.
batch := make([]blocks.Block, 0, len(toreify))
for _, c := range toreify {
blk, err := s.cold.Get(s.ctx, c)
if err != nil {
log.Warnf("error retrieving cold object for reification (cid: %s): %s", c, err)
continue
}
if err := s.checkClosing(); err != nil {
return
}
batch = append(batch, blk)
}
if len(batch) > 0 {
err = s.hot.PutMany(s.ctx, batch)
if err != nil {
log.Warnf("error reifying cold object (cid: %s): %s", c, err)
return
}
}
if s.txnMarkSet != nil {
if len(toreify) > 0 {
if err := s.txnMarkSet.MarkMany(toreify); err != nil {
log.Warnf("error marking reified objects: %s", err)
}
}
if len(totrack) > 0 {
if err := s.txnMarkSet.MarkMany(totrack); err != nil {
log.Warnf("error marking tracked objects: %s", err)
}
}
} else {
// if txnActive is false these are noops
if len(toreify) > 0 {
s.trackTxnRefMany(toreify)
}
if len(totrack) > 0 {
s.trackTxnRefMany(totrack)
}
}
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"math/rand"
"os"
"sync"
"sync/atomic"
@ -387,6 +388,235 @@ func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) {
}
}
func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) {
ds := dssync.MutexWrap(datastore.NewMapDatastore())
hot := newMockStore()
cold := newMockStore()
mkRandomBlock := func() blocks.Block {
data := make([]byte, 128)
_, err := rand.Read(data)
if err != nil {
t.Fatal(err)
}
return blocks.NewBlock(data)
}
block1 := mkRandomBlock()
block2 := mkRandomBlock()
block3 := mkRandomBlock()
hdr := mock.MkBlock(nil, 0, 0)
hdr.Messages = block1.Cid()
hdr.ParentMessageReceipts = block2.Cid()
hdr.ParentStateRoot = block3.Cid()
block4, err := hdr.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
allBlocks := []blocks.Block{block1, block2, block3, block4}
for _, blk := range allBlocks {
err := cold.Put(context.Background(), blk)
if err != nil {
t.Fatal(err)
}
}
path, err := ioutil.TempDir("", "splitstore.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"})
if err != nil {
t.Fatal(err)
}
defer ss.Close() //nolint
ss.warmupEpoch = 1
go ss.reifyOrchestrator()
waitForReification := func() {
for {
ss.reifyMx.Lock()
ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0
ss.reifyMx.Unlock()
if ready {
return
}
time.Sleep(time.Millisecond)
}
}
// first access using the standard view
err = f(context.Background(), ss, block4.Cid())
if err != nil {
t.Fatal(err)
}
// nothing should be reified
waitForReification()
for _, blk := range allBlocks {
has, err := hot.Has(context.Background(), blk.Cid())
if err != nil {
t.Fatal(err)
}
if has {
t.Fatal("block unexpectedly reified")
}
}
// now make the hot/reifying view and ensure access reifies
err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid())
if err != nil {
t.Fatal(err)
}
// everything should be reified
waitForReification()
for i, blk := range allBlocks {
has, err := hot.Has(context.Background(), blk.Cid())
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatalf("block%d was not reified", i+1)
}
}
}
func testSplitStoreReificationLimit(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) {
ds := dssync.MutexWrap(datastore.NewMapDatastore())
hot := newMockStore()
cold := newMockStore()
mkRandomBlock := func() blocks.Block {
data := make([]byte, 128)
_, err := rand.Read(data)
if err != nil {
t.Fatal(err)
}
return blocks.NewBlock(data)
}
block1 := mkRandomBlock()
block2 := mkRandomBlock()
block3 := mkRandomBlock()
hdr := mock.MkBlock(nil, 0, 0)
hdr.Messages = block1.Cid()
hdr.ParentMessageReceipts = block2.Cid()
hdr.ParentStateRoot = block3.Cid()
block4, err := hdr.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
allBlocks := []blocks.Block{block1, block2, block3, block4}
for _, blk := range allBlocks {
err := cold.Put(context.Background(), blk)
if err != nil {
t.Fatal(err)
}
}
path, err := ioutil.TempDir("", "splitstore.*")
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(path)
})
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map"})
if err != nil {
t.Fatal(err)
}
defer ss.Close() //nolint
ss.warmupEpoch = 1
go ss.reifyOrchestrator()
waitForReification := func() {
for {
ss.reifyMx.Lock()
ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0
ss.reifyMx.Unlock()
if ready {
return
}
time.Sleep(time.Millisecond)
}
}
// do a hot access -- nothing should be reified as the limit should be exceeded
oldReifyLimit := ReifyLimit
ReifyLimit = 2
t.Cleanup(func() {
ReifyLimit = oldReifyLimit
})
err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid())
if err != nil {
t.Fatal(err)
}
waitForReification()
for _, blk := range allBlocks {
has, err := hot.Has(context.Background(), blk.Cid())
if err != nil {
t.Fatal(err)
}
if has {
t.Fatal("block unexpectedly reified")
}
}
}
func TestSplitStoreReification(t *testing.T) {
t.Log("test reification with Has")
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
_, err := s.Has(ctx, c)
return err
})
t.Log("test reification with Get")
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
_, err := s.Get(ctx, c)
return err
})
t.Log("test reification with GetSize")
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
_, err := s.GetSize(ctx, c)
return err
})
t.Log("test reification with View")
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
return s.View(ctx, c, func(_ []byte) error { return nil })
})
t.Log("test reification limit")
testSplitStoreReificationLimit(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
_, err := s.Has(ctx, c)
return err
})
}
type mockChain struct {
t testing.TB

View File

@ -26,6 +26,10 @@ type tmpVisitor struct {
var _ ObjectVisitor = (*tmpVisitor)(nil)
func (v *tmpVisitor) Visit(c cid.Cid) (bool, error) {
if isUnitaryObject(c) {
return false, nil
}
return v.set.Visit(c), nil
}
@ -45,6 +49,10 @@ func newConcurrentVisitor() *concurrentVisitor {
}
func (v *concurrentVisitor) Visit(c cid.Cid) (bool, error) {
if isUnitaryObject(c) {
return false, nil
}
v.mx.Lock()
defer v.mx.Unlock()

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -67,7 +67,8 @@ const UpgradeHyperdriveHeight = 892800
// 2021-10-26T13:30:00Z
const UpgradeChocolateHeight = 1231620
var UpgradeOhSnapHeight = abi.ChainEpoch(999999999999)
// 2022-03-01T15:00:00Z
var UpgradeOhSnapHeight = abi.ChainEpoch(1594680)
func init() {
if os.Getenv("LOTUS_USE_TEST_ADDRESSES") != "1" {

View File

@ -32,6 +32,7 @@ import (
/* inline-gen end */
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin"
@ -92,6 +93,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, sm *stmgr.StateManager
partDone()
}()
ctx = blockstore.WithHotView(ctx)
makeVmWithBaseStateAndEpoch := func(base cid.Cid, e abi.ChainEpoch) (*vm.VM, error) {
vmopt := &vm.VMOpts{
StateBase: base,

View File

@ -165,13 +165,8 @@ func DefaultUpgradeSchedule() stmgr.UpgradeSchedule {
Migration: UpgradeActorsV7,
PreMigrations: []stmgr.PreMigration{{
PreMigration: PreUpgradeActorsV7,
StartWithin: 120,
StartWithin: 180,
DontStartWithin: 60,
StopWithin: 35,
}, {
PreMigration: PreUpgradeActorsV7,
StartWithin: 30,
DontStartWithin: 15,
StopWithin: 5,
}},
Expensive: true,
@ -1264,7 +1259,7 @@ func upgradeActorsV7Common(
root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet,
config nv15.Config,
) (cid.Cid, error) {
writeStore := blockstore.NewAutobatch(ctx, sm.ChainStore().StateBlockstore(), units.GiB)
writeStore := blockstore.NewAutobatch(ctx, sm.ChainStore().StateBlockstore(), units.GiB/4)
// TODO: pretty sure we'd achieve nothing by doing this, confirm in review
//buf := blockstore.NewTieredBstore(sm.ChainStore().StateBlockstore(), writeStore)
store := store.ActorStore(ctx, writeStore)

View File

@ -0,0 +1,224 @@
//stm: #unit
package messagepool
import (
"context"
"fmt"
"testing"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/assert"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
"github.com/filecoin-project/lotus/chain/wallet"
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
)
func init() {
_ = logging.SetLogLevel("*", "INFO")
}
func getCheckMessageStatus(statusCode api.CheckStatusCode, msgStatuses []api.MessageCheckStatus) (*api.MessageCheckStatus, error) {
for i := 0; i < len(msgStatuses); i++ {
iMsgStatuses := msgStatuses[i]
if iMsgStatuses.CheckStatus.Code == statusCode {
return &iMsgStatuses, nil
}
}
return nil, fmt.Errorf("Could not find CheckStatusCode %s", statusCode)
}
func TestCheckMessages(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CHECK_MESSAGES_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
if err != nil {
t.Fatal(err)
}
sender, err := w.WalletNew(context.Background(), types.KTSecp256k1)
if err != nil {
t.Fatal(err)
}
tma.setBalance(sender, 1000e15)
target := mock.Address(1001)
var protos []*api.MessagePrototype
for i := 0; i < 5; i++ {
msg := &types.Message{
To: target,
From: sender,
Value: types.NewInt(1),
Nonce: uint64(i),
GasLimit: 50000000,
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
GasPremium: types.NewInt(1),
Params: make([]byte, 2<<10),
}
proto := &api.MessagePrototype{
Message: *msg,
ValidNonce: true,
}
protos = append(protos, proto)
}
messageStatuses, err := mp.CheckMessages(context.TODO(), protos)
assert.NoError(t, err)
for i := 0; i < len(messageStatuses); i++ {
iMsgStatuses := messageStatuses[i]
for j := 0; j < len(iMsgStatuses); j++ {
jStatus := iMsgStatuses[i]
assert.True(t, jStatus.OK)
}
}
}
func TestCheckPendingMessages(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
if err != nil {
t.Fatal(err)
}
sender, err := w.WalletNew(context.Background(), types.KTSecp256k1)
if err != nil {
t.Fatal(err)
}
tma.setBalance(sender, 1000e15)
target := mock.Address(1001)
// add a valid message to the pool
msg := &types.Message{
To: target,
From: sender,
Value: types.NewInt(1),
Nonce: 0,
GasLimit: 50000000,
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
GasPremium: types.NewInt(1),
Params: make([]byte, 2<<10),
}
sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{})
if err != nil {
panic(err)
}
sm := &types.SignedMessage{
Message: *msg,
Signature: *sig,
}
mustAdd(t, mp, sm)
messageStatuses, err := mp.CheckPendingMessages(context.TODO(), sender)
assert.NoError(t, err)
for i := 0; i < len(messageStatuses); i++ {
iMsgStatuses := messageStatuses[i]
for j := 0; j < len(iMsgStatuses); j++ {
jStatus := iMsgStatuses[i]
assert.True(t, jStatus.OK)
}
}
}
func TestCheckReplaceMessages(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CHECK_REPLACE_MESSAGES_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
if err != nil {
t.Fatal(err)
}
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
if err != nil {
t.Fatal(err)
}
sender, err := w.WalletNew(context.Background(), types.KTSecp256k1)
if err != nil {
t.Fatal(err)
}
tma.setBalance(sender, 1000e15)
target := mock.Address(1001)
// add a valid message to the pool
msg := &types.Message{
To: target,
From: sender,
Value: types.NewInt(1),
Nonce: 0,
GasLimit: 50000000,
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
GasPremium: types.NewInt(1),
Params: make([]byte, 2<<10),
}
sig, err := w.WalletSign(context.TODO(), sender, msg.Cid().Bytes(), api.MsgMeta{})
if err != nil {
panic(err)
}
sm := &types.SignedMessage{
Message: *msg,
Signature: *sig,
}
mustAdd(t, mp, sm)
// create a new message with the same data, except that it is too big
var msgs []*types.Message
invalidmsg := &types.Message{
To: target,
From: sender,
Value: types.NewInt(1),
Nonce: 0,
GasLimit: 50000000,
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
GasPremium: types.NewInt(1),
Params: make([]byte, 128<<10),
}
msgs = append(msgs, invalidmsg)
{
messageStatuses, err := mp.CheckReplaceMessages(context.TODO(), msgs)
if err != nil {
t.Fatal(err)
}
for i := 0; i < len(messageStatuses); i++ {
iMsgStatuses := messageStatuses[i]
status, err := getCheckMessageStatus(api.CheckStatusMessageSize, iMsgStatuses)
if err != nil {
t.Fatal(err)
}
// the replacement message should cause a status error
assert.False(t, status.OK)
}
}
}

View File

@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
@ -226,6 +227,8 @@ func mustAdd(t *testing.T, mp *MessagePool, msg *types.SignedMessage) {
}
func TestMessagePool(t *testing.T) {
//stm: @CHAIN_MEMPOOL_GET_NONCE_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
@ -327,6 +330,7 @@ func TestCheckMessageBig(t *testing.T) {
Message: *msg,
Signature: *sig,
}
//stm: @CHAIN_MEMPOOL_PUSH_001
err = mp.Add(context.TODO(), sm)
assert.ErrorIs(t, err, ErrMessageTooBig)
}
@ -760,3 +764,302 @@ func TestUpdates(t *testing.T) {
t.Fatal("expected closed channel, but got an update instead")
}
}
func TestMessageBelowMinGasFee(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
// fee is just below minimum gas fee
fee := minimumBaseFee.Uint64() - 1
{
msg := &types.Message{
To: to,
From: from,
Value: types.NewInt(1),
Nonce: 0,
GasLimit: 50000000,
GasFeeCap: types.NewInt(fee),
GasPremium: types.NewInt(1),
Params: make([]byte, 32<<10),
}
sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{})
if err != nil {
panic(err)
}
sm := &types.SignedMessage{
Message: *msg,
Signature: *sig,
}
err = mp.Add(context.TODO(), sm)
assert.ErrorIs(t, err, ErrGasFeeCapTooLow)
}
}
func TestMessageValueTooHigh(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
totalFil := types.TotalFilecoinInt
extra := types.NewInt(1)
value := types.BigAdd(totalFil, extra)
{
msg := &types.Message{
To: to,
From: from,
Value: value,
Nonce: 0,
GasLimit: 50000000,
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
GasPremium: types.NewInt(1),
Params: make([]byte, 32<<10),
}
sig, err := w.WalletSign(context.TODO(), from, msg.Cid().Bytes(), api.MsgMeta{})
if err != nil {
panic(err)
}
sm := &types.SignedMessage{
Message: *msg,
Signature: *sig,
}
err = mp.Add(context.TODO(), sm)
assert.Error(t, err)
}
}
func TestMessageSignatureInvalid(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
{
msg := &types.Message{
To: to,
From: from,
Value: types.NewInt(1),
Nonce: 0,
GasLimit: 50000000,
GasFeeCap: types.NewInt(minimumBaseFee.Uint64()),
GasPremium: types.NewInt(1),
Params: make([]byte, 32<<10),
}
badSig := &crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: make([]byte, 0),
}
sm := &types.SignedMessage{
Message: *msg,
Signature: *badSig,
}
err = mp.Add(context.TODO(), sm)
assert.Error(t, err)
// assert.Contains(t, err.Error(), "invalid signature length")
assert.Error(t, err)
}
}
func TestAddMessageTwice(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
{
// create a valid messages
sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64())
mustAdd(t, mp, sm)
// try to add it twice
err = mp.Add(context.TODO(), sm)
// assert.Contains(t, err.Error(), "with nonce 0 already in mpool")
assert.Error(t, err)
}
}
func TestAddMessageTwiceNonceGap(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
{
// create message with invalid nonce (1)
sm := makeTestMessage(w, from, to, 1, 50_000_000, minimumBaseFee.Uint64())
mustAdd(t, mp, sm)
// then try to add message again
err = mp.Add(context.TODO(), sm)
// assert.Contains(t, err.Error(), "unfulfilled nonce gap")
assert.Error(t, err)
}
}
func TestAddMessageTwiceCidDiff(t *testing.T) {
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
{
sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64())
mustAdd(t, mp, sm)
// Create message with different data, so CID is different
sm2 := makeTestMessage(w, from, to, 0, 50_000_001, minimumBaseFee.Uint64())
//stm: @CHAIN_MEMPOOL_PUSH_001
// then try to add message again
err = mp.Add(context.TODO(), sm2)
// assert.Contains(t, err.Error(), "replace by fee has too low GasPremium")
assert.Error(t, err)
}
}
func TestAddMessageTwiceCidDiffReplaced(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
{
sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64())
mustAdd(t, mp, sm)
// Create message with different data, so CID is different
sm2 := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64()*2)
mustAdd(t, mp, sm2)
}
}
func TestRemoveMessage(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001
tma := newTestMpoolAPI()
w, err := wallet.NewWallet(wallet.NewMemKeyStore())
assert.NoError(t, err)
from, err := w.WalletNew(context.Background(), types.KTBLS)
assert.NoError(t, err)
tma.setBalance(from, 1000e9)
ds := datastore.NewMapDatastore()
mp, err := New(context.Background(), tma, ds, filcns.DefaultUpgradeSchedule(), "mptest", nil)
assert.NoError(t, err)
to := mock.Address(1001)
{
sm := makeTestMessage(w, from, to, 0, 50_000_000, minimumBaseFee.Uint64())
mustAdd(t, mp, sm)
//stm: @CHAIN_MEMPOOL_REMOVE_001
// remove message for sender
mp.Remove(context.TODO(), from, sm.Message.Nonce, true)
//stm: @CHAIN_MEMPOOL_PENDING_FOR_001
// check messages in pool: should be none present
msgs := mp.pendingFor(context.TODO(), from)
assert.Len(t, msgs, 0)
}
}

View File

@ -1,3 +1,4 @@
//stm: #unit
package messagepool
import (
@ -16,6 +17,7 @@ import (
)
func TestRepubMessages(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001
oldRepublishBatchDelay := RepublishBatchDelay
RepublishBatchDelay = time.Microsecond
defer func() {
@ -57,6 +59,7 @@ func TestRepubMessages(t *testing.T) {
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
//stm: @CHAIN_MEMPOOL_PUSH_001
_, err := mp.Push(context.TODO(), m)
if err != nil {
t.Fatal(err)

View File

@ -1,3 +1,4 @@
//stm: #unit
package messagepool
import (
@ -74,6 +75,8 @@ func makeTestMpool() (*MessagePool, *testMpoolAPI) {
}
func TestMessageChains(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001
mp, tma := makeTestMpool()
// the actors
@ -310,6 +313,8 @@ func TestMessageChains(t *testing.T) {
}
func TestMessageChainSkipping(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001
// regression test for chain skip bug
mp, tma := makeTestMpool()
@ -382,6 +387,7 @@ func TestMessageChainSkipping(t *testing.T) {
}
func TestBasicMessageSelection(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
oldMaxNonceGap := MaxNonceGap
MaxNonceGap = 1000
defer func() {
@ -532,6 +538,7 @@ func TestBasicMessageSelection(t *testing.T) {
}
func TestMessageSelectionTrimmingGas(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -595,6 +602,7 @@ func TestMessageSelectionTrimmingGas(t *testing.T) {
}
func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -641,6 +649,7 @@ func TestMessageSelectionTrimmingMsgsBasic(t *testing.T) {
}
func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -707,6 +716,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersBasic(t *testing.T) {
}
func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -788,6 +798,7 @@ func TestMessageSelectionTrimmingMsgsTwoSendersAdvanced(t *testing.T) {
}
func TestPriorityMessageSelection(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -867,6 +878,7 @@ func TestPriorityMessageSelection(t *testing.T) {
}
func TestPriorityMessageSelection2(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -934,6 +946,7 @@ func TestPriorityMessageSelection2(t *testing.T) {
}
func TestPriorityMessageSelection3(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
mp, tma := makeTestMpool()
// the actors
@ -1028,6 +1041,8 @@ func TestPriorityMessageSelection3(t *testing.T) {
}
func TestOptimalMessageSelection1(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
// this test uses just a single actor sending messages with a low tq
// the chain depenent merging algorithm should pick messages from the actor
// from the start
@ -1094,6 +1109,8 @@ func TestOptimalMessageSelection1(t *testing.T) {
}
func TestOptimalMessageSelection2(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
// this test uses two actors sending messages to each other, with the first
// actor paying (much) higher gas premium than the second.
// We select with a low ticket quality; the chain depenent merging algorithm should pick
@ -1173,6 +1190,8 @@ func TestOptimalMessageSelection2(t *testing.T) {
}
func TestOptimalMessageSelection3(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
// this test uses 10 actors sending a block of messages to each other, with the the first
// actors paying higher gas premium than the subsequent actors.
// We select with a low ticket quality; the chain dependent merging algorithm should pick
@ -1416,6 +1435,8 @@ func makeZipfPremiumDistribution(rng *rand.Rand) func() uint64 {
}
func TestCompetitiveMessageSelectionExp(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
if testing.Short() {
t.Skip("skipping in short mode")
}
@ -1439,6 +1460,8 @@ func TestCompetitiveMessageSelectionExp(t *testing.T) {
}
func TestCompetitiveMessageSelectionZipf(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @CHAIN_MEMPOOL_SELECT_001
if testing.Short() {
t.Skip("skipping in short mode")
}
@ -1462,6 +1485,7 @@ func TestCompetitiveMessageSelectionZipf(t *testing.T) {
}
func TestGasReward(t *testing.T) {
//stm: @CHAIN_MEMPOOL_GET_GAS_REWARD_001
tests := []struct {
Premium uint64
FeeCap uint64
@ -1494,6 +1518,8 @@ func TestGasReward(t *testing.T) {
}
func TestRealWorldSelection(t *testing.T) {
//stm: @TOKEN_WALLET_NEW_001, @TOKEN_WALLET_SIGN_001, @CHAIN_MEMPOOL_SELECT_001
// load test-messages.json.gz and rewrite the messages so that
// 1) we map each real actor to a test actor so that we can sign the messages
// 2) adjust the nonces so that they start from 0

View File

@ -2,8 +2,8 @@ package mock
import (
"context"
"crypto/rand"
"fmt"
"math/rand"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
@ -90,19 +90,24 @@ func TipSet(blks ...*types.BlockHeader) *types.TipSet {
return ts
}
func RandomActorAddress() (*address.Address, error) {
bytes := make([]byte, 32)
_, err := rand.Read(bytes)
if err != nil {
return nil, err
}
// Generates count new addresses using the provided seed, and returns them
func RandomActorAddresses(seed int64, count int) ([]*address.Address, error) {
randAddrs := make([]*address.Address, count)
source := rand.New(rand.NewSource(seed))
for i := 0; i < count; i++ {
bytes := make([]byte, 32)
_, err := source.Read(bytes)
if err != nil {
return nil, err
}
addr, err := address.NewActorAddress(bytes)
if err != nil {
return nil, err
addr, err := address.NewActorAddress(bytes)
if err != nil {
return nil, err
}
randAddrs[i] = &addr
}
return &addr, nil
return randAddrs, nil
}
func UnsignedMessage(from, to address.Address, nonce uint64) *types.Message {

View File

@ -1,3 +1,4 @@
//stm: #cli
package cli
import (
@ -32,6 +33,7 @@ func TestChainHead(t *testing.T) {
mockApi.EXPECT().ChainHead(ctx).Return(ts, nil),
)
//stm: @CLI_CHAIN_HEAD_001
err := app.Run([]string{"chain", "head"})
assert.NoError(t, err)
@ -55,6 +57,7 @@ func TestGetBlock(t *testing.T) {
mockApi.EXPECT().ChainGetParentReceipts(ctx, block.Cid()).Return([]*types.MessageReceipt{}, nil),
)
//stm: @CLI_CHAIN_GET_BLOCK_001
err := app.Run([]string{"chain", "getblock", block.Cid().String()})
assert.NoError(t, err)
@ -89,6 +92,7 @@ func TestReadOjb(t *testing.T) {
mockApi.EXPECT().ChainReadObj(ctx, block.Cid()).Return(obj.Bytes(), nil),
)
//stm: @CLI_CHAIN_READ_OBJECT_001
err = app.Run([]string{"chain", "read-obj", block.Cid().String()})
assert.NoError(t, err)
@ -104,6 +108,7 @@ func TestChainDeleteObj(t *testing.T) {
app, _, _, done := NewMockAppWithFullAPI(t, cmd)
defer done()
//stm: @CLI_CHAIN_DELETE_OBJECT_002
err := app.Run([]string{"chain", "delete-obj", block.Cid().String()})
assert.Error(t, err)
})
@ -120,6 +125,7 @@ func TestChainDeleteObj(t *testing.T) {
mockApi.EXPECT().ChainDeleteObj(ctx, block.Cid()).Return(nil),
)
//stm: @CLI_CHAIN_DELETE_OBJECT_001
err := app.Run([]string{"chain", "delete-obj", "--really-do-it=true", block.Cid().String()})
assert.NoError(t, err)
@ -152,6 +158,7 @@ func TestChainStatObj(t *testing.T) {
mockApi.EXPECT().ChainStatObj(ctx, block.Cid(), cid.Undef).Return(stat, nil),
)
//stm: @CLI_CHAIN_STAT_OBJECT_001
err := app.Run([]string{"chain", "stat-obj", block.Cid().String()})
assert.NoError(t, err)
@ -170,6 +177,7 @@ func TestChainStatObj(t *testing.T) {
mockApi.EXPECT().ChainStatObj(ctx, block.Cid(), block.Cid()).Return(stat, nil),
)
//stm: @CLI_CHAIN_STAT_OBJECT_002
err := app.Run([]string{"chain", "stat-obj", fmt.Sprintf("-base=%s", block.Cid().String()), block.Cid().String()})
assert.NoError(t, err)
@ -181,11 +189,11 @@ func TestChainGetMsg(t *testing.T) {
app, mockApi, buf, done := NewMockAppWithFullAPI(t, WithCategory("chain", ChainGetMsgCmd))
defer done()
from, err := mock.RandomActorAddress()
addrs, err := mock.RandomActorAddresses(12345, 2)
assert.NoError(t, err)
to, err := mock.RandomActorAddress()
assert.NoError(t, err)
from := addrs[0]
to := addrs[1]
msg := mock.UnsignedMessage(*from, *to, 0)
@ -200,6 +208,7 @@ func TestChainGetMsg(t *testing.T) {
mockApi.EXPECT().ChainReadObj(ctx, msg.Cid()).Return(obj.Bytes(), nil),
)
//stm: @CLI_CHAIN_GET_MESSAGE_001
err = app.Run([]string{"chain", "getmessage", msg.Cid().String()})
assert.NoError(t, err)
@ -229,6 +238,7 @@ func TestSetHead(t *testing.T) {
mockApi.EXPECT().ChainSetHead(ctx, genesis.Key()).Return(nil),
)
//stm: @CLI_CHAIN_SET_HEAD_003
err := app.Run([]string{"chain", "sethead", "-genesis=true", ts.Key().String()})
assert.NoError(t, err)
})
@ -246,6 +256,7 @@ func TestSetHead(t *testing.T) {
mockApi.EXPECT().ChainSetHead(ctx, genesis.Key()).Return(nil),
)
//stm: @CLI_CHAIN_SET_HEAD_002
err := app.Run([]string{"chain", "sethead", fmt.Sprintf("-epoch=%s", epoch), ts.Key().String()})
assert.NoError(t, err)
})
@ -263,8 +274,7 @@ func TestSetHead(t *testing.T) {
mockApi.EXPECT().ChainSetHead(ctx, ts.Key()).Return(nil),
)
// ts.Key should be passed as an array of arguments (CIDs)
// since we have only one CID in the key, this is ok
//stm: @CLI_CHAIN_SET_HEAD_001
err := app.Run([]string{"chain", "sethead", ts.Key().Cids()[0].String()})
assert.NoError(t, err)
})
@ -274,11 +284,11 @@ func TestInspectUsage(t *testing.T) {
cmd := WithCategory("chain", ChainInspectUsage)
ts := mock.TipSet(mock.MkBlock(nil, 0, 0))
from, err := mock.RandomActorAddress()
addrs, err := mock.RandomActorAddresses(12345, 2)
assert.NoError(t, err)
to, err := mock.RandomActorAddress()
assert.NoError(t, err)
from := addrs[0]
to := addrs[1]
msg := mock.UnsignedMessage(*from, *to, 0)
msgs := []api.Message{{Cid: msg.Cid(), Message: msg}}
@ -303,6 +313,7 @@ func TestInspectUsage(t *testing.T) {
mockApi.EXPECT().StateGetActor(ctx, *to, ts.Key()).Return(actor, nil),
)
//stm: @CLI_CHAIN_INSPECT_USAGE_001
err := app.Run([]string{"chain", "inspect-usage"})
assert.NoError(t, err)
@ -311,7 +322,10 @@ func TestInspectUsage(t *testing.T) {
// output is plaintext, had to do string matching
assert.Contains(t, out, from.String())
assert.Contains(t, out, to.String())
assert.Contains(t, out, "Send")
// check for gas by sender
assert.Contains(t, out, "By Sender")
// check for gas by method
assert.Contains(t, out, "By Method:\nSend")
})
}
@ -322,13 +336,11 @@ func TestChainList(t *testing.T) {
blk.Height = 1
head := mock.TipSet(blk)
head.Height()
from, err := mock.RandomActorAddress()
addrs, err := mock.RandomActorAddresses(12345, 2)
assert.NoError(t, err)
to, err := mock.RandomActorAddress()
assert.NoError(t, err)
from := addrs[0]
to := addrs[1]
msg := mock.UnsignedMessage(*from, *to, 0)
msgs := []api.Message{{Cid: msg.Cid(), Message: msg}}
@ -352,6 +364,7 @@ func TestChainList(t *testing.T) {
mockApi.EXPECT().ChainGetBlockMessages(ctx, head.Blocks()[0].Cid()).Return(blockMsgs, nil),
)
//stm: CLI_CHAIN_LIST_001
err := app.Run([]string{"chain", "love", "--gas-stats=true"}) // chain is love ❤️
assert.NoError(t, err)
@ -382,6 +395,7 @@ func TestChainGet(t *testing.T) {
mockApi.EXPECT().ChainGetNode(ctx, path).Return(&api.IpldObject{Cid: blk.Cid(), Obj: blk}, nil),
)
//stm: @CLI_CHAIN_GET_001
err := app.Run([]string{"chain", "get", path})
assert.NoError(t, err)
@ -407,6 +421,7 @@ func TestChainGet(t *testing.T) {
mockApi.EXPECT().ChainGetNode(ctx, p2).Return(&api.IpldObject{Cid: blk.Cid(), Obj: blk}, nil),
)
//stm: @CLI_CHAIN_GET_002
err := app.Run([]string{"chain", "get", p1})
assert.NoError(t, err)
@ -430,6 +445,7 @@ func TestChainGet(t *testing.T) {
mockApi.EXPECT().ChainGetNode(ctx, path).Return(&api.IpldObject{Cid: blk.Cid(), Obj: blk}, nil),
)
//stm: @CLI_CHAIN_GET_004
err := app.Run([]string{"chain", "get", "-as-type=foo", path})
assert.Error(t, err)
})
@ -465,6 +481,7 @@ func TestChainBisect(t *testing.T) {
mockApi.EXPECT().ChainGetNode(ctx, path).Return(&api.IpldObject{Cid: blk2.Cid(), Obj: blk2}, nil),
)
//stm: @CLI_CHAIN_BISECT_001
err := app.Run([]string{"chain", "bisect", fmt.Sprintf("%d", minHeight), fmt.Sprintf("%d", maxHeight), subpath, shell})
assert.NoError(t, err)
@ -497,6 +514,7 @@ func TestChainExport(t *testing.T) {
mockApi.EXPECT().ChainExport(ctx, abi.ChainEpoch(0), false, ts.Key()).Return(export, nil),
)
//stm: @CLI_CHAIN_EXPORT_001
err := app.Run([]string{"chain", "export", "whatever.car"})
assert.NoError(t, err)
@ -522,6 +540,7 @@ func TestChainGasPrice(t *testing.T) {
calls++
})
//stm: @CLI_CHAIN_GAS_PRICE_001
err := app.Run([]string{"chain", "gas-price"})
assert.NoError(t, err)

View File

@ -667,6 +667,8 @@ uiLoop:
state = "miner"
case "miner":
maddrs = maddrs[:0]
ask = ask[:0]
afmt.Print("Miner Addresses (f0.. f0..), none to find: ")
_maddrsStr, _, err := rl.ReadLine()
@ -802,7 +804,8 @@ uiLoop:
dealCount, err = strconv.ParseInt(string(dealcStr), 10, 64)
if err != nil {
return err
printErr(xerrors.Errorf("reading deal count: invalid number"))
continue
}
color.Blue(".. Picking miners")
@ -859,12 +862,13 @@ uiLoop:
a, err := api.ClientQueryAsk(ctx, *mi.PeerId, maddr)
if err != nil {
printErr(xerrors.Errorf("failed to query ask: %w", err))
printErr(xerrors.Errorf("failed to query ask for miner %s: %w", maddr.String(), err))
state = "miner"
continue uiLoop
}
ask = append(ask, *a)
}
// TODO: run more validation

View File

@ -126,7 +126,7 @@ func infoCmdAct(cctx *cli.Context) error {
alerts, err := minerApi.LogAlerts(ctx)
if err != nil {
return xerrors.Errorf("getting alerts: %w", err)
fmt.Printf("ERROR: getting alerts: %s\n", err)
}
activeAlerts := make([]alerting.Alert, 0)

View File

@ -96,6 +96,11 @@ var infoAllCmd = &cli.Command{
fmt.Println("ERROR: ", err)
}
fmt.Println("\n#: Storage Locks")
if err := storageLocks.Action(cctx); err != nil {
fmt.Println("ERROR: ", err)
}
fmt.Println("\n#: Sched Diag")
if err := sealingSchedDiagCmd.Action(cctx); err != nil {
fmt.Println("ERROR: ", err)
@ -192,6 +197,11 @@ var infoAllCmd = &cli.Command{
fmt.Println("ERROR: ", err)
}
fmt.Println("\n#: Storage Sector List")
if err := storageListSectorsCmd.Action(cctx); err != nil {
fmt.Println("ERROR: ", err)
}
fmt.Println("\n#: Expired Sectors")
if err := sectorsExpiredCmd.Action(cctx); err != nil {
fmt.Println("ERROR: ", err)

View File

@ -467,12 +467,15 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{})
smgr, err := sectorstorage.New(ctx, lstor, stor, lr, si, sectorstorage.SealerConfig{
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,
AllowPreCommit2: true,
AllowCommit: true,
AllowUnseal: true,
ParallelFetchLimit: 10,
AllowAddPiece: true,
AllowPreCommit1: true,
AllowPreCommit2: true,
AllowCommit: true,
AllowUnseal: true,
AllowReplicaUpdate: true,
AllowProveReplicaUpdate2: true,
AllowRegenSectorKey: true,
}, wsts, smsts)
if err != nil {
return err

View File

@ -161,7 +161,7 @@ var sectorsStatusCmd = &cli.Command{
fmt.Printf("Expiration:\t\t%v\n", status.Expiration)
fmt.Printf("DealWeight:\t\t%v\n", status.DealWeight)
fmt.Printf("VerifiedDealWeight:\t\t%v\n", status.VerifiedDealWeight)
fmt.Printf("InitialPledge:\t\t%v\n", status.InitialPledge)
fmt.Printf("InitialPledge:\t\t%v\n", types.FIL(status.InitialPledge))
fmt.Printf("\nExpiration Info\n")
fmt.Printf("OnTime:\t\t%v\n", status.OnTime)
fmt.Printf("Early:\t\t%v\n", status.Early)
@ -294,8 +294,14 @@ var sectorsListCmd = &cli.Command{
Aliases: []string{"e"},
},
&cli.BoolFlag{
Name: "seal-time",
Usage: "display how long it took for the sector to be sealed",
Name: "initial-pledge",
Usage: "display initial pledge",
Aliases: []string{"p"},
},
&cli.BoolFlag{
Name: "seal-time",
Usage: "display how long it took for the sector to be sealed",
Aliases: []string{"t"},
},
&cli.StringFlag{
Name: "states",
@ -405,6 +411,7 @@ var sectorsListCmd = &cli.Command{
tablewriter.Col("Deals"),
tablewriter.Col("DealWeight"),
tablewriter.Col("VerifiedPower"),
tablewriter.Col("Pledge"),
tablewriter.NewLineCol("Error"),
tablewriter.NewLineCol("RecoveryTimeout"))
@ -483,6 +490,9 @@ var sectorsListCmd = &cli.Command{
m["RecoveryTimeout"] = color.YellowString(lcli.EpochTime(head.Height(), st.Early))
}
}
if inSSet && cctx.Bool("initial-pledge") {
m["Pledge"] = types.FIL(st.InitialPledge).Short()
}
}
if !fast && deals > 0 {

View File

@ -368,6 +368,7 @@ type storedSector struct {
store stores.SectorStorageInfo
unsealed, sealed, cache bool
update, updatecache bool
}
var storageFindCmd = &cli.Command{
@ -421,6 +422,16 @@ var storageFindCmd = &cli.Command{
return xerrors.Errorf("finding cache: %w", err)
}
us, err := nodeApi.StorageFindSector(ctx, sid, storiface.FTUpdate, 0, false)
if err != nil {
return xerrors.Errorf("finding sealed: %w", err)
}
uc, err := nodeApi.StorageFindSector(ctx, sid, storiface.FTUpdateCache, 0, false)
if err != nil {
return xerrors.Errorf("finding cache: %w", err)
}
byId := map[stores.ID]*storedSector{}
for _, info := range u {
sts, ok := byId[info.ID]
@ -455,6 +466,28 @@ var storageFindCmd = &cli.Command{
}
sts.cache = true
}
for _, info := range us {
sts, ok := byId[info.ID]
if !ok {
sts = &storedSector{
id: info.ID,
store: info,
}
byId[info.ID] = sts
}
sts.update = true
}
for _, info := range uc {
sts, ok := byId[info.ID]
if !ok {
sts = &storedSector{
id: info.ID,
store: info,
}
byId[info.ID] = sts
}
sts.updatecache = true
}
local, err := nodeApi.StorageLocal(ctx)
if err != nil {
@ -480,6 +513,12 @@ var storageFindCmd = &cli.Command{
if info.cache {
types += "Cache, "
}
if info.update {
types += "Update, "
}
if info.updatecache {
types += "UpdateCache, "
}
fmt.Printf("In %s (%s)\n", info.id, types[:len(types)-2])
fmt.Printf("\tSealing: %t; Storage: %t\n", info.store.CanSeal, info.store.CanStore)

View File

@ -173,6 +173,11 @@ var runCmd = &cli.Command{
Usage: "enable prove replica update 2",
Value: true,
},
&cli.BoolFlag{
Name: "regen-sector-key",
Usage: "enable regen sector key",
Value: true,
},
&cli.IntFlag{
Name: "parallel-fetch-limit",
Usage: "maximum fetch operations to run in parallel",
@ -278,12 +283,15 @@ var runCmd = &cli.Command{
if cctx.Bool("commit") {
taskTypes = append(taskTypes, sealtasks.TTCommit2)
}
if cctx.Bool("replicaupdate") {
if cctx.Bool("replica-update") {
taskTypes = append(taskTypes, sealtasks.TTReplicaUpdate)
}
if cctx.Bool("prove-replica-update2") {
taskTypes = append(taskTypes, sealtasks.TTProveReplicaUpdate2)
}
if cctx.Bool("regen-sector-key") {
taskTypes = append(taskTypes, sealtasks.TTRegenSectorKey)
}
if len(taskTypes) == 0 {
return xerrors.Errorf("no task types specified")

View File

@ -22,11 +22,14 @@ var tasksCmd = &cli.Command{
}
var allowSetting = map[sealtasks.TaskType]struct{}{
sealtasks.TTAddPiece: {},
sealtasks.TTPreCommit1: {},
sealtasks.TTPreCommit2: {},
sealtasks.TTCommit2: {},
sealtasks.TTUnseal: {},
sealtasks.TTAddPiece: {},
sealtasks.TTPreCommit1: {},
sealtasks.TTPreCommit2: {},
sealtasks.TTCommit2: {},
sealtasks.TTUnseal: {},
sealtasks.TTReplicaUpdate: {},
sealtasks.TTProveReplicaUpdate2: {},
sealtasks.TTRegenSectorKey: {},
}
var settableStr = func() string {

View File

@ -508,12 +508,19 @@ var genesisSetRemainderCmd = &cli.Command{
}
var genesisSetActorVersionCmd = &cli.Command{
Name: "set-network-version",
Usage: "Set the version that this network will start from",
ArgsUsage: "<genesisFile> <actorVersion>",
Name: "set-network-version",
Usage: "Set the version that this network will start from",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "network-version",
Usage: "network version to start genesis with",
Value: int(build.GenesisNetworkVersion),
},
},
ArgsUsage: "<genesisFile>",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 2 {
return fmt.Errorf("must specify genesis file and network version (e.g. '0'")
if cctx.Args().Len() != 1 {
return fmt.Errorf("must specify genesis file")
}
genf, err := homedir.Expand(cctx.Args().First())
@ -531,16 +538,12 @@ var genesisSetActorVersionCmd = &cli.Command{
return xerrors.Errorf("unmarshal genesis template: %w", err)
}
nv, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64)
if err != nil {
return xerrors.Errorf("parsing network version: %w", err)
}
if nv > uint64(build.NewestNetworkVersion) {
nv := network.Version(cctx.Int("network-version"))
if nv > build.NewestNetworkVersion {
return xerrors.Errorf("invalid network version: %d", nv)
}
template.NetworkVersion = network.Version(nv)
template.NetworkVersion = nv
b, err = json.MarshalIndent(&template, "", " ")
if err != nil {

104
cmd/lotus-shed/diff.go Normal file
View File

@ -0,0 +1,104 @@
package main
import (
"fmt"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
)
var diffCmd = &cli.Command{
Name: "diff",
Usage: "diff state objects",
Subcommands: []*cli.Command{diffStateTrees},
}
var diffStateTrees = &cli.Command{
Name: "state-trees",
Usage: "diff two state-trees",
ArgsUsage: "<state-tree-a> <state-tree-b>",
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
if cctx.NArg() != 2 {
return xerrors.Errorf("expected two state-tree roots")
}
argA := cctx.Args().Get(1)
rootA, err := cid.Parse(argA)
if err != nil {
return xerrors.Errorf("first state-tree root (%q) is not a CID: %w", argA, err)
}
argB := cctx.Args().Get(1)
rootB, err := cid.Parse(argB)
if err != nil {
return xerrors.Errorf("second state-tree root (%q) is not a CID: %w", argB, err)
}
if rootA == rootB {
fmt.Println("state trees do not differ")
return nil
}
changedB, err := api.StateChangedActors(ctx, rootA, rootB)
if err != nil {
return err
}
changedA, err := api.StateChangedActors(ctx, rootB, rootA)
if err != nil {
return err
}
diff := func(stateA, stateB types.Actor) {
if stateB.Code != stateA.Code {
fmt.Printf(" code: %s != %s\n", stateA.Code, stateB.Code)
}
if stateB.Head != stateA.Head {
fmt.Printf(" state: %s != %s\n", stateA.Head, stateB.Head)
}
if stateB.Nonce != stateA.Nonce {
fmt.Printf(" nonce: %d != %d\n", stateA.Nonce, stateB.Nonce)
}
if !stateB.Balance.Equals(stateA.Balance) {
fmt.Printf(" balance: %s != %s\n", stateA.Balance, stateB.Balance)
}
}
fmt.Printf("state differences between %s (first) and %s (second):\n\n", rootA, rootB)
for addr, stateA := range changedA {
fmt.Println(addr)
stateB, ok := changedB[addr]
if ok {
diff(stateA, stateB)
continue
} else {
fmt.Printf(" actor does not exist in second state-tree (%s)\n", rootB)
}
fmt.Println()
delete(changedB, addr)
}
for addr, stateB := range changedB {
fmt.Println(addr)
stateA, ok := changedA[addr]
if ok {
diff(stateA, stateB)
continue
} else {
fmt.Printf(" actor does not exist in first state-tree (%s)\n", rootA)
}
fmt.Println()
}
return nil
},
}

View File

@ -68,6 +68,7 @@ func main() {
sendCsvCmd,
terminationsCmd,
migrationsCmd,
diffCmd,
}
app := &cli.App{

View File

@ -1621,14 +1621,15 @@ USAGE:
lotus-miner sectors list [command options] [arguments...]
OPTIONS:
--show-removed, -r show removed sectors (default: false)
--color, -c use color in display output (default: depends on output being a TTY)
--fast, -f don't show on-chain info for better performance (default: false)
--events, -e display number of events the sector has received (default: false)
--seal-time display how long it took for the sector to be sealed (default: false)
--states value filter sectors by a comma-separated list of states
--unproven, -u only show sectors which aren't in the 'Proving' state (default: false)
--help, -h show help (default: false)
--show-removed, -r show removed sectors (default: false)
--color, -c use color in display output (default: depends on output being a TTY)
--fast, -f don't show on-chain info for better performance (default: false)
--events, -e display number of events the sector has received (default: false)
--initial-pledge, -p display initial pledge (default: false)
--seal-time, -t display how long it took for the sector to be sealed (default: false)
--states value filter sectors by a comma-separated list of states
--unproven, -u only show sectors which aren't in the 'Proving' state (default: false)
--help, -h show help (default: false)
```

View File

@ -46,6 +46,7 @@ OPTIONS:
--commit enable commit (32G sectors: all cores or GPUs, 128GiB Memory + 64GiB swap) (default: true)
--replica-update enable replica update (default: true)
--prove-replica-update2 enable prove replica update 2 (default: true)
--regen-sector-key enable regen sector key (default: true)
--parallel-fetch-limit value maximum fetch operations to run in parallel (default: 5)
--timeout value used when 'listen' is unspecified. must be a valid duration recognized by golang's time.ParseDuration function (default: "30m")
--help, -h show help (default: false)
@ -170,7 +171,7 @@ NAME:
lotus-worker tasks enable - Enable a task type
USAGE:
lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|AP]
lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK]
OPTIONS:
--help, -h show help (default: false)
@ -183,7 +184,7 @@ NAME:
lotus-worker tasks disable - Disable a task type
USAGE:
lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|AP]
lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK]
OPTIONS:
--help, -h show help (default: false)

View File

@ -438,6 +438,9 @@
# env var: LOTUS_STORAGE_ALLOWPROVEREPLICAUPDATE2
#AllowProveReplicaUpdate2 = true
# env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY
#AllowRegenSectorKey = true
# env var: LOTUS_STORAGE_RESOURCEFILTERING
#ResourceFiltering = "hardware"

View File

@ -105,6 +105,7 @@ type SealerConfig struct {
AllowUnseal bool
AllowReplicaUpdate bool
AllowProveReplicaUpdate2 bool
AllowRegenSectorKey bool
// ResourceFiltering instructs the system which resource filtering strategy
// to use when evaluating tasks against this worker. An empty value defaults
@ -169,6 +170,9 @@ func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls store
if sc.AllowProveReplicaUpdate2 {
localTasks = append(localTasks, sealtasks.TTProveReplicaUpdate2)
}
if sc.AllowRegenSectorKey {
localTasks = append(localTasks, sealtasks.TTRegenSectorKey)
}
wcfg := WorkerConfig{
IgnoreResourceFiltering: sc.ResourceFiltering == ResourceFilteringDisabled,

View File

@ -214,8 +214,13 @@ func checkReplicaUpdate(ctx context.Context, maddr address.Address, si SectorInf
if err != nil {
return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)}
}
if si.UpdateUnsealed == nil || !commD.Equals(*si.UpdateUnsealed) {
return &ErrBadRU{xerrors.Errorf("on chain CommD differs from sector: %s != %s", commD, si.CommD)}
if si.UpdateUnsealed == nil {
return &ErrBadRU{xerrors.New("nil UpdateUnsealed cid after replica update")}
}
if !commD.Equals(*si.UpdateUnsealed) {
return &ErrBadRU{xerrors.Errorf("calculated CommD differs from updated replica: %s != %s", commD, *si.UpdateUnsealed)}
}
if si.UpdateSealed == nil {

View File

@ -108,7 +108,8 @@ func sectorActive(ctx context.Context, api SealingAPI, maddr address.Address, to
if err != nil {
return false, xerrors.Errorf("failed to check active sectors: %w", err)
}
// Check if sector is among active sectors
// Ensure the upgraded sector is active
var found bool
for _, si := range active {
if si.SectorNumber == sector {

View File

@ -51,6 +51,7 @@ type TargetAPI interface {
MpoolPushUntrusted(ctx context.Context, sm *types.SignedMessage) (cid.Cid, error)
MsigGetAvailableBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (types.BigInt, error)
MsigGetVested(ctx context.Context, addr address.Address, start types.TipSetKey, end types.TipSetKey) (types.BigInt, error)
MsigGetVestingSchedule(context.Context, address.Address, types.TipSetKey) (api.MsigVesting, error)
MsigGetPending(ctx context.Context, addr address.Address, ts types.TipSetKey) ([]*api.MsigTransaction, error)
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (api.DealCollateralBounds, error)
@ -282,6 +283,13 @@ func (gw *Node) MsigGetVested(ctx context.Context, addr address.Address, start t
return gw.target.MsigGetVested(ctx, addr, start, end)
}
func (gw *Node) MsigGetVestingSchedule(ctx context.Context, addr address.Address, tsk types.TipSetKey) (api.MsigVesting, error) {
if err := gw.checkTipsetKey(ctx, tsk); err != nil {
return api.MsigVesting{}, err
}
return gw.target.MsigGetVestingSchedule(ctx, addr, tsk)
}
func (gw *Node) MsigGetPending(ctx context.Context, addr address.Address, tsk types.TipSetKey) ([]*api.MsigTransaction, error) {
if err := gw.checkTipsetKey(ctx, tsk); err != nil {
return nil, err

8
go.mod
View File

@ -2,6 +2,8 @@ module github.com/filecoin-project/lotus
go 1.16
retract v1.14.0 // Accidentally force-pushed tag, use v1.14.1+ instead.
require (
contrib.go.opencensus.io/exporter/prometheus v0.4.0
github.com/BurntSushi/toml v0.4.1
@ -108,7 +110,7 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-eventbus v0.2.1
github.com/libp2p/go-libp2p v0.18.0-rc4
github.com/libp2p/go-libp2p v0.18.0-rc5
github.com/libp2p/go-libp2p-connmgr v0.3.1 // indirect
github.com/libp2p/go-libp2p-core v0.14.0
github.com/libp2p/go-libp2p-discovery v0.6.0
@ -118,9 +120,9 @@ require (
github.com/libp2p/go-libp2p-pubsub v0.6.1
github.com/libp2p/go-libp2p-quic-transport v0.16.1
github.com/libp2p/go-libp2p-record v0.1.3
github.com/libp2p/go-libp2p-resource-manager v0.1.3
github.com/libp2p/go-libp2p-resource-manager v0.1.4
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
github.com/libp2p/go-libp2p-swarm v0.10.1
github.com/libp2p/go-libp2p-swarm v0.10.2
github.com/libp2p/go-libp2p-tls v0.3.1
github.com/libp2p/go-libp2p-yamux v0.8.2
github.com/libp2p/go-maddr-filter v0.1.0

14
go.sum
View File

@ -995,8 +995,8 @@ github.com/libp2p/go-libp2p v0.14.4/go.mod h1:EIRU0Of4J5S8rkockZM7eJp2S0UrCyi55m
github.com/libp2p/go-libp2p v0.16.0/go.mod h1:ump42BsirwAWxKzsCiFnTtN1Yc+DuPu76fyMX364/O4=
github.com/libp2p/go-libp2p v0.17.0/go.mod h1:Fkin50rsGdv5mm5BshBUtPRZknt9esfmYXBOYcwOTgw=
github.com/libp2p/go-libp2p v0.18.0-rc1/go.mod h1:RgYlH7IIWHXREimC92bw5Lg1V2R5XmSzuLHb5fTnr+8=
github.com/libp2p/go-libp2p v0.18.0-rc4 h1:OUsSbeu7q+Ck/bV9wHDxFzb08ORqBupHhpCmRBhWrJ8=
github.com/libp2p/go-libp2p v0.18.0-rc4/go.mod h1:wzmsk1ioOq9FGQys2BN5BIw4nugP6+R+CyW3JbPEbbs=
github.com/libp2p/go-libp2p v0.18.0-rc5 h1:88wWDHb9nNo0vBNCupLde3OTnFAkugOCNkrDfl3ivK4=
github.com/libp2p/go-libp2p v0.18.0-rc5/go.mod h1:aZPS5l84bDvCvP4jkyEUT/J6YOpUq33Fgqrs3K59mpI=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052/go.mod h1:nRMRTab+kZuk0LnKZpxhOVH/ndsdr2Nr//Zltc/vwgo=
github.com/libp2p/go-libp2p-asn-util v0.1.0 h1:rABPCO77SjdbJ/eJ/ynIo8vWICy1VEnL5JAxJbQLo1E=
github.com/libp2p/go-libp2p-asn-util v0.1.0/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
@ -1158,8 +1158,9 @@ github.com/libp2p/go-libp2p-record v0.1.2/go.mod h1:pal0eNcT5nqZaTV7UGhqeGqxFgGd
github.com/libp2p/go-libp2p-record v0.1.3 h1:R27hoScIhQf/A8XJZ8lYpnqh9LatJ5YbHs28kCIfql0=
github.com/libp2p/go-libp2p-record v0.1.3/go.mod h1:yNUff/adKIfPnYQXgp6FQmNu3gLJ6EMg7+/vv2+9pY4=
github.com/libp2p/go-libp2p-resource-manager v0.1.0/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-resource-manager v0.1.3 h1:Umf0tW6WNXSb6Uoma0YT56azB5iikL/aeGAP7s7+f5o=
github.com/libp2p/go-libp2p-resource-manager v0.1.3/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-resource-manager v0.1.4 h1:RcxMD0pytOUimx3BqTVs6IqItb3H5Qg44SD7XyT68lw=
github.com/libp2p/go-libp2p-resource-manager v0.1.4/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-routing v0.0.1/go.mod h1:N51q3yTr4Zdr7V8Jt2JIktVU+3xBBylx1MZeVA6t1Ys=
github.com/libp2p/go-libp2p-routing v0.1.0/go.mod h1:zfLhI1RI8RLEzmEaaPwzonRvXeeSHddONWkcTcB54nE=
github.com/libp2p/go-libp2p-routing-helpers v0.2.3 h1:xY61alxJ6PurSi+MXbywZpelvuU4U4p/gPTxjqCqTzY=
@ -1181,8 +1182,8 @@ github.com/libp2p/go-libp2p-swarm v0.5.3/go.mod h1:NBn7eNW2lu568L7Ns9wdFrOhgRlkR
github.com/libp2p/go-libp2p-swarm v0.8.0/go.mod h1:sOMp6dPuqco0r0GHTzfVheVBh6UEL0L1lXUZ5ot2Fvc=
github.com/libp2p/go-libp2p-swarm v0.9.0/go.mod h1:2f8d8uxTJmpeqHF/1ujjdXZp+98nNIbujVOMEZxCbZ8=
github.com/libp2p/go-libp2p-swarm v0.10.0/go.mod h1:71ceMcV6Rg/0rIQ97rsZWMzto1l9LnNquef+efcRbmA=
github.com/libp2p/go-libp2p-swarm v0.10.1 h1:lXW3pgGt+BVmkzcFX61erX7l6Lt+WAamNhwa2Kf3eJM=
github.com/libp2p/go-libp2p-swarm v0.10.1/go.mod h1:Pdkq0QU5a+qu+oyqIV3bknMsnzk9lnNyKvB9acJ5aZs=
github.com/libp2p/go-libp2p-swarm v0.10.2 h1:UaXf+CTq6Ns1N2V1EgqJ9Q3xaRsiN7ImVlDMpirMAWw=
github.com/libp2p/go-libp2p-swarm v0.10.2/go.mod h1:Pdkq0QU5a+qu+oyqIV3bknMsnzk9lnNyKvB9acJ5aZs=
github.com/libp2p/go-libp2p-testing v0.0.1/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.2/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
github.com/libp2p/go-libp2p-testing v0.0.3/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=
@ -1298,8 +1299,9 @@ github.com/libp2p/go-tcp-transport v0.2.3/go.mod h1:9dvr03yqrPyYGIEN6Dy5UvdJZjyP
github.com/libp2p/go-tcp-transport v0.2.4/go.mod h1:9dvr03yqrPyYGIEN6Dy5UvdJZjyPFvl1S/igQ5QD1SU=
github.com/libp2p/go-tcp-transport v0.2.7/go.mod h1:lue9p1b3VmZj1MhhEGB/etmvF/nBQ0X9CW2DutBT3MM=
github.com/libp2p/go-tcp-transport v0.4.0/go.mod h1:0y52Rwrn4076xdJYu/51/qJIdxz+EWDAOG2S45sV3VI=
github.com/libp2p/go-tcp-transport v0.5.0 h1:3ZPW8HAuyRAuFzyabE0hSrCXKKSWzROnZZX7DtcIatY=
github.com/libp2p/go-tcp-transport v0.5.0/go.mod h1:UPPL0DIjQqiWRwVAb+CEQlaAG0rp/mCqJfIhFcLHc4Y=
github.com/libp2p/go-tcp-transport v0.5.1 h1:edOOs688VLZAozWC7Kj5/6HHXKNwi9M6wgRmmLa8M6Q=
github.com/libp2p/go-tcp-transport v0.5.1/go.mod h1:UPPL0DIjQqiWRwVAb+CEQlaAG0rp/mCqJfIhFcLHc4Y=
github.com/libp2p/go-testutil v0.0.1/go.mod h1:iAcJc/DKJQanJ5ws2V+u5ywdL2n12X1WbbEG+Jjy69I=
github.com/libp2p/go-testutil v0.1.0/go.mod h1:81b2n5HypcVyrCg/MJx4Wgfp/VHojytjVe/gLzZ2Ehc=
github.com/libp2p/go-ws-transport v0.0.5/go.mod h1:Qbl4BxPfXXhhd/o0wcrgoaItHqA9tnZjoFZnxykuaXU=

View File

@ -3,6 +3,7 @@ package kit
import (
"bytes"
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
@ -10,6 +11,7 @@ import (
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/api"
aminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
@ -63,11 +65,10 @@ func (p *partitionTracker) done(t *testing.T) bool {
return uint64(len(p.partitions)) == p.count(t)
}
func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *types.SignedMessage) (ret bool) {
func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, msg *types.Message) (ret bool) {
defer func() {
ret = p.done(t)
}()
msg := smsg.Message
if !(msg.To == bm.miner.ActorAddr) {
return
}
@ -82,11 +83,69 @@ func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *type
return
}
func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *dline.Info) {
tracker := newPartitionTracker(ctx, dlinfo.Index, bm)
if !tracker.done(bm.t) { // need to wait for post
bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t))
poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) //subscribe before checking pending so we don't miss any events
require.NoError(bm.t, err)
// First check pending messages we'll mine this epoch
msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK)
require.NoError(bm.t, err)
for _, msg := range msgs {
if tracker.recordIfPost(bm.t, bm, &msg.Message) {
fmt.Printf("found post in mempool pending\n")
}
}
// Account for included but not yet executed messages
for _, bc := range ts.Cids() {
msgs, err := bm.miner.FullNode.ChainGetBlockMessages(ctx, bc)
require.NoError(bm.t, err)
for _, msg := range msgs.BlsMessages {
if tracker.recordIfPost(bm.t, bm, msg) {
fmt.Printf("found post in message of prev tipset\n")
}
}
for _, msg := range msgs.SecpkMessages {
if tracker.recordIfPost(bm.t, bm, &msg.Message) {
fmt.Printf("found post in message of prev tipset\n")
}
}
}
// post not yet in mpool, wait for it
if !tracker.done(bm.t) {
bm.t.Logf("post missing from mpool, block mining suspended until it arrives")
POOL:
for {
bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key())
select {
case <-ctx.Done():
return
case evt := <-poolEvts:
bm.t.Logf("pool event: %d", evt.Type)
if evt.Type == api.MpoolAdd {
bm.t.Logf("incoming message %v", evt.Message)
if tracker.recordIfPost(bm.t, bm, &evt.Message.Message) {
fmt.Printf("found post in mempool evt\n")
break POOL
}
}
}
}
bm.t.Logf("done waiting on mpool")
}
}
}
// Like MineBlocks but refuses to mine until the window post scheduler has wdpost messages in the mempool
// and everything shuts down if a post fails. It also enforces that every block mined succeeds
func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Duration) {
time.Sleep(3 * time.Second)
time.Sleep(time.Second)
// wrap context in a cancellable context.
ctx, bm.cancel = context.WithCancel(ctx)
@ -94,8 +153,6 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
go func() {
defer bm.wg.Done()
activeDeadlines := make(map[int]struct{})
_ = activeDeadlines
ts, err := bm.miner.FullNode.ChainHead(ctx)
require.NoError(bm.t, err)
wait := make(chan bool)
@ -103,7 +160,7 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
require.NoError(bm.t, err)
// read current out
curr := <-chg
require.Equal(bm.t, ts.Height(), curr[0].Val.Height())
require.Equal(bm.t, ts.Height(), curr[0].Val.Height(), "failed sanity check: are multiple miners mining with must post?")
for {
select {
case <-time.After(blocktime):
@ -111,52 +168,15 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
return
}
nulls := atomic.SwapInt64(&bm.nextNulls, 0)
require.Equal(bm.t, int64(0), nulls, "Injecting > 0 null blocks while `MustPost` mining is currently unsupported")
// Wake up and figure out if we are at the end of an active deadline
ts, err := bm.miner.FullNode.ChainHead(ctx)
require.NoError(bm.t, err)
tsk := ts.Key()
dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, tsk)
dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, ts.Key())
require.NoError(bm.t, err)
if ts.Height()+1 == dlinfo.Last() { // Last epoch in dline, we need to check that miner has posted
tracker := newPartitionTracker(ctx, dlinfo.Index, bm)
if !tracker.done(bm.t) { // need to wait for post
bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t))
poolEvts, err := bm.miner.FullNode.MpoolSub(ctx)
require.NoError(bm.t, err)
// First check pending messages we'll mine this epoch
msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK)
require.NoError(bm.t, err)
for _, msg := range msgs {
tracker.recordIfPost(bm.t, bm, msg)
}
// post not yet in mpool, wait for it
if !tracker.done(bm.t) {
bm.t.Logf("post missing from mpool, block mining suspended until it arrives")
POOL:
for {
bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key())
select {
case <-ctx.Done():
return
case evt := <-poolEvts:
bm.t.Logf("pool event: %d", evt.Type)
if evt.Type == api.MpoolAdd {
bm.t.Logf("incoming message %v", evt.Message)
if tracker.recordIfPost(bm.t, bm, evt.Message) {
break POOL
}
}
}
}
bm.t.Logf("done waiting on mpool")
}
}
if ts.Height()+1+abi.ChainEpoch(nulls) >= dlinfo.Last() { // Next block brings us past the last epoch in dline, we need to wait for miner to post
bm.forcePoSt(ctx, ts, dlinfo)
}
var target abi.ChainEpoch
@ -173,6 +193,12 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
Done: reportSuccessFn,
})
success = <-wait
if !success {
// if we are mining a new null block and it brings us past deadline boundary we need to wait for miner to post
if ts.Height()+1+abi.ChainEpoch(nulls+i) >= dlinfo.Last() {
bm.forcePoSt(ctx, ts, dlinfo)
}
}
}
// Wait until it shows up on the given full nodes ChainHead

34
itests/kit/circuit.go Normal file
View File

@ -0,0 +1,34 @@
package kit
import (
"fmt"
"testing"
"time"
)
/*
CircuitBreaker implements a simple time-based circuit breaker used for waiting for async operations to finish.
This is how it works:
- It runs the `cb` function until it returns true,
- waiting for `throttle` duration between each iteration,
- or at most `timeout` duration until it breaks test execution.
You can use it if t.Deadline() is not "granular" enough, and you want to know which specific piece of code timed out,
or you need to set different deadlines in the same test.
*/
func CircuitBreaker(t *testing.T, label string, throttle, timeout time.Duration, cb func() bool) {
tmo := time.After(timeout)
for {
if cb() {
break
}
select {
case <-tmo:
t.Fatal("timeout: ", label)
default:
fmt.Printf("waiting: %s\n", label)
time.Sleep(throttle)
}
}
}

View File

@ -151,6 +151,11 @@ func NewEnsemble(t *testing.T, opts ...EnsembleOpt) *Ensemble {
return n
}
// Mocknet returns the underlying mocknet.
func (n *Ensemble) Mocknet() mocknet.Mocknet {
return n.mn
}
// FullNode enrolls a new full node.
func (n *Ensemble) FullNode(full *TestFullNode, opts ...NodeOpt) *Ensemble {
options := DefaultNodeOpts

521
itests/mempool_test.go Normal file
View File

@ -0,0 +1,521 @@
//stm: #integration
package itests
import (
"context"
"testing"
"time"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/stretchr/testify/require"
)
const mPoolThrottle = time.Millisecond * 100
const mPoolTimeout = time.Second * 10
func TestMemPoolPushSingleNode(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_PUSH_002
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, err := firstNode.WalletNew(ctx, types.KTBLS)
require.NoError(t, err)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// add messages to be mined/published
var sms []*types.SignedMessage
for i := 0; i < totalMessages; i++ {
msg := &types.Message{
From: sender,
To: addr,
Value: each,
}
sm, err := firstNode.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm.Message.Nonce)
sms = append(sms, sm)
}
// check pending messages for address
kit.CircuitBreaker(t, "push messages", mPoolThrottle, mPoolTimeout, func() bool {
msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender)
if len(msgStatuses) == totalMessages {
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
return true
}
return false
})
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
for _, msg := range sms {
found := false
for _, selectedMsg := range selected {
if selectedMsg.Cid() == msg.Cid() {
found = true
break
}
}
require.True(t, found)
}
ens.BeginMining(blockTime)
kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool {
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
if len(pending) == 0 {
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
return true
}
return false
})
}
func TestMemPoolPushTwoNodes(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_PUSH_002
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, secondNode, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
sender2 := secondNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
addr2, _ := secondNode.WalletNew(ctx, types.KTBLS)
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
const totalMessages = 10
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
var sms []*types.SignedMessage
// push messages to message pools of both nodes
for i := 0; i < totalMessages; i++ {
// first
msg1 := &types.Message{
From: sender,
To: addr,
Value: each,
}
sm1, err := firstNode.MpoolPushMessage(ctx, msg1, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm1.Message.Nonce)
sms = append(sms, sm1)
// second
msg2 := &types.Message{
From: sender2,
To: addr2,
Value: each,
}
sm2, err := secondNode.MpoolPushMessage(ctx, msg2, nil)
require.NoError(t, err)
require.EqualValues(t, i, sm2.Message.Nonce)
sms = append(sms, sm2)
}
ens.BeginMining(blockTime)
kit.CircuitBreaker(t, "push & mine messages", mPoolThrottle, mPoolTimeout, func() bool {
pending1, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
pending2, err := secondNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
if len(pending1) == 0 && len(pending2) == 0 {
// Check messages on both nodes
for _, lookMsg := range sms {
msgLookup1, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup1)
msgLookup2, err := secondNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup2)
}
return true
}
return false
})
}
func TestMemPoolClearPending(t *testing.T) {
//stm: @CHAIN_MEMPOOL_PUSH_001, @CHAIN_MEMPOOL_PENDING_001
//stm: @CHAIN_STATE_WAIT_MSG_001, @CHAIN_MEMPOOL_CLEAR_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// Add single message, then clear the pool
msg := &types.Message{
From: sender,
To: addr,
Value: each,
}
_, err = firstNode.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
// message should be in the mempool
kit.CircuitBreaker(t, "push message", mPoolThrottle, mPoolTimeout, func() bool {
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
return len(pending) == 1
})
err = firstNode.MpoolClear(ctx, true)
require.NoError(t, err)
// pool should be empty now
kit.CircuitBreaker(t, "clear mempool", mPoolThrottle, mPoolTimeout, func() bool {
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
return len(pending) == 0
})
// mine a couple of blocks
ens.BeginMining(blockTime)
time.Sleep(5 * blockTime)
// make sure that the cleared message wasn't picked up and mined
_, err = firstNode.StateWaitMsg(ctx, msg.Cid(), 3, api.LookbackNoLimit, true)
require.Error(t, err)
}
func TestMemPoolBatchPush(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001
//stm: @CHAIN_MEMPOOL_BATCH_PUSH_001
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// add messages to be mined/published
var sms []*types.SignedMessage
for i := 0; i < totalMessages; i++ {
msg := &types.Message{
From: sender,
To: addr,
Value: each,
Nonce: uint64(i),
GasLimit: 50_000_000,
GasFeeCap: types.NewInt(100_000_000),
GasPremium: types.NewInt(1),
}
signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg)
require.NoError(t, err)
sms = append(sms, signedMessage)
}
_, err = firstNode.MpoolBatchPush(ctx, sms)
require.NoError(t, err)
// check pending messages for address
kit.CircuitBreaker(t, "batch push", mPoolThrottle, mPoolTimeout, func() bool {
msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender)
require.NoError(t, err)
if len(msgStatuses) == totalMessages {
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
return true
}
return false
})
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
require.NoError(t, err)
for _, msg := range sms {
found := false
for _, selectedMsg := range selected {
if selectedMsg.Cid() == msg.Cid() {
found = true
break
}
}
require.True(t, found)
}
ens.BeginMining(blockTime)
kit.CircuitBreaker(t, "mine messages", mPoolThrottle, mPoolTimeout, func() bool {
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
if len(pending) == 0 {
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
return true
}
return false
})
}
func TestMemPoolPushSingleNodeUntrusted(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001
//stm: @CHAIN_MEMPOOL_PUSH_003
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// add messages to be mined/published
var sms []*types.SignedMessage
for i := 0; i < totalMessages; i++ {
msg := &types.Message{
From: sender,
To: addr,
Value: each,
Nonce: uint64(i),
GasLimit: 50_000_000,
GasFeeCap: types.NewInt(100_000_000),
GasPremium: types.NewInt(1),
}
signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg)
require.NoError(t, err)
// push untrusted messages
pushedCid, err := firstNode.MpoolPushUntrusted(ctx, signedMessage)
require.NoError(t, err)
require.Equal(t, msg.Cid(), pushedCid)
sms = append(sms, signedMessage)
}
kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
// check pending messages for address
msgStatuses, _ := firstNode.MpoolCheckPendingMessages(ctx, sender)
if len(msgStatuses) == totalMessages {
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
return true
}
return false
})
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
for _, msg := range sms {
found := false
for _, selectedMsg := range selected {
if selectedMsg.Cid() == msg.Cid() {
found = true
break
}
}
require.True(t, found)
}
ens.BeginMining(blockTime)
kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
// pool pending list should be empty
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
if len(pending) == 0 {
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
return true
}
return false
})
}
func TestMemPoolBatchPushUntrusted(t *testing.T) {
//stm: @CHAIN_MEMPOOL_CREATE_MSG_CHAINS_001, @CHAIN_MEMPOOL_SELECT_001, @CHAIN_MEMPOOL_CAP_GAS_FEE_001
//stm: @CHAIN_MEMPOOL_CHECK_PENDING_MESSAGES_001, @CHAIN_MEMPOOL_SELECT_001
//stm: @CHAIN_MEMPOOL_PENDING_001, @CHAIN_STATE_WAIT_MSG_001
//stm: @CHAIN_MEMPOOL_BATCH_PUSH_002
ctx := context.Background()
const blockTime = 100 * time.Millisecond
firstNode, _, _, ens := kit.EnsembleTwoOne(t, kit.MockProofs())
ens.InterconnectAll()
kit.QuietMiningLogs()
sender := firstNode.DefaultKey.Address
addr, _ := firstNode.WalletNew(ctx, types.KTBLS)
const totalMessages = 10
bal, err := firstNode.WalletBalance(ctx, sender)
require.NoError(t, err)
toSend := big.Div(bal, big.NewInt(10))
each := big.Div(toSend, big.NewInt(totalMessages))
// add messages to be mined/published
var sms []*types.SignedMessage
for i := 0; i < totalMessages; i++ {
msg := &types.Message{
From: sender,
To: addr,
Value: each,
Nonce: uint64(i),
GasLimit: 50_000_000,
GasFeeCap: types.NewInt(100_000_000),
GasPremium: types.NewInt(1),
}
signedMessage, err := firstNode.WalletSignMessage(ctx, sender, msg)
require.NoError(t, err)
sms = append(sms, signedMessage)
}
_, err = firstNode.MpoolBatchPushUntrusted(ctx, sms)
require.NoError(t, err)
// check pending messages for address, wait until they are all pushed
kit.CircuitBreaker(t, "push untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
msgStatuses, err := firstNode.MpoolCheckPendingMessages(ctx, sender)
require.NoError(t, err)
if len(msgStatuses) == totalMessages {
for _, msgStatusList := range msgStatuses {
for _, status := range msgStatusList {
require.True(t, status.OK)
}
}
return true
}
return false
})
// verify messages should be the ones included in the next block
selected, _ := firstNode.MpoolSelect(ctx, types.EmptyTSK, 0)
for _, msg := range sms {
found := false
for _, selectedMsg := range selected {
if selectedMsg.Cid() == msg.Cid() {
found = true
break
}
}
require.True(t, found)
}
ens.BeginMining(blockTime)
// wait until pending messages are mined, pool pending list should be empty
kit.CircuitBreaker(t, "mine untrusted messages", mPoolThrottle, mPoolTimeout, func() bool {
pending, err := firstNode.MpoolPending(context.TODO(), types.EmptyTSK)
require.NoError(t, err)
if len(pending) == 0 {
// all messages should be added to the chain
for _, lookMsg := range sms {
msgLookup, err := firstNode.StateWaitMsg(ctx, lookMsg.Cid(), 3, api.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, msgLookup)
}
return true
}
return false
})
}

View File

@ -51,7 +51,7 @@ func TestPaymentChannelsAPI(t *testing.T) {
Miner(&miner, &paymentCreator, kit.WithAllSubsystems()).
Start().
InterconnectAll()
bms := ens.BeginMining(blockTime)
bms := ens.BeginMiningMustPost(blockTime)
bm := bms[0]
// send some funds to register the receiver

View File

@ -47,6 +47,12 @@ var (
WorkerHostname, _ = tag.NewKey("worker_hostname")
StorageID, _ = tag.NewKey("storage_id")
SectorState, _ = tag.NewKey("sector_state")
// rcmgr
ServiceID, _ = tag.NewKey("svc")
ProtocolID, _ = tag.NewKey("proto")
Direction, _ = tag.NewKey("direction")
UseFD, _ = tag.NewKey("use_fd")
)
// Measures
@ -143,6 +149,22 @@ var (
SplitstoreCompactionHot = stats.Int64("splitstore/hot", "Number of hot blocks in last compaction", stats.UnitDimensionless)
SplitstoreCompactionCold = stats.Int64("splitstore/cold", "Number of cold blocks in last compaction", stats.UnitDimensionless)
SplitstoreCompactionDead = stats.Int64("splitstore/dead", "Number of dead blocks in last compaction", stats.UnitDimensionless)
// rcmgr
RcmgrAllowConn = stats.Int64("rcmgr/allow_conn", "Number of allowed connections", stats.UnitDimensionless)
RcmgrBlockConn = stats.Int64("rcmgr/block_conn", "Number of blocked connections", stats.UnitDimensionless)
RcmgrAllowStream = stats.Int64("rcmgr/allow_stream", "Number of allowed streams", stats.UnitDimensionless)
RcmgrBlockStream = stats.Int64("rcmgr/block_stream", "Number of blocked streams", stats.UnitDimensionless)
RcmgrAllowPeer = stats.Int64("rcmgr/allow_peer", "Number of allowed peer connections", stats.UnitDimensionless)
RcmgrBlockPeer = stats.Int64("rcmgr/block_peer", "Number of blocked peer connections", stats.UnitDimensionless)
RcmgrAllowProto = stats.Int64("rcmgr/allow_proto", "Number of allowed streams attached to a protocol", stats.UnitDimensionless)
RcmgrBlockProto = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol", stats.UnitDimensionless)
RcmgrBlockProtoPeer = stats.Int64("rcmgr/block_proto", "Number of blocked blocked streams attached to a protocol for a specific peer", stats.UnitDimensionless)
RcmgrAllowSvc = stats.Int64("rcmgr/allow_svc", "Number of allowed streams attached to a service", stats.UnitDimensionless)
RcmgrBlockSvc = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service", stats.UnitDimensionless)
RcmgrBlockSvcPeer = stats.Int64("rcmgr/block_svc", "Number of blocked blocked streams attached to a service for a specific peer", stats.UnitDimensionless)
RcmgrAllowMem = stats.Int64("rcmgr/allow_mem", "Number of allowed memory reservations", stats.UnitDimensionless)
RcmgrBlockMem = stats.Int64("rcmgr/block_mem", "Number of blocked memory reservations", stats.UnitDimensionless)
)
var (
@ -496,6 +518,76 @@ var (
Measure: GraphsyncSendingPeersPending,
Aggregation: view.LastValue(),
}
// rcmgr
RcmgrAllowConnView = &view.View{
Measure: RcmgrAllowConn,
Aggregation: view.Count(),
TagKeys: []tag.Key{Direction, UseFD},
}
RcmgrBlockConnView = &view.View{
Measure: RcmgrBlockConn,
Aggregation: view.Count(),
TagKeys: []tag.Key{Direction, UseFD},
}
RcmgrAllowStreamView = &view.View{
Measure: RcmgrAllowStream,
Aggregation: view.Count(),
TagKeys: []tag.Key{PeerID, Direction},
}
RcmgrBlockStreamView = &view.View{
Measure: RcmgrBlockStream,
Aggregation: view.Count(),
TagKeys: []tag.Key{PeerID, Direction},
}
RcmgrAllowPeerView = &view.View{
Measure: RcmgrAllowPeer,
Aggregation: view.Count(),
TagKeys: []tag.Key{PeerID},
}
RcmgrBlockPeerView = &view.View{
Measure: RcmgrBlockPeer,
Aggregation: view.Count(),
TagKeys: []tag.Key{PeerID},
}
RcmgrAllowProtoView = &view.View{
Measure: RcmgrAllowProto,
Aggregation: view.Count(),
TagKeys: []tag.Key{ProtocolID},
}
RcmgrBlockProtoView = &view.View{
Measure: RcmgrBlockProto,
Aggregation: view.Count(),
TagKeys: []tag.Key{ProtocolID},
}
RcmgrBlockProtoPeerView = &view.View{
Measure: RcmgrBlockProtoPeer,
Aggregation: view.Count(),
TagKeys: []tag.Key{ProtocolID, PeerID},
}
RcmgrAllowSvcView = &view.View{
Measure: RcmgrAllowSvc,
Aggregation: view.Count(),
TagKeys: []tag.Key{ServiceID},
}
RcmgrBlockSvcView = &view.View{
Measure: RcmgrBlockSvc,
Aggregation: view.Count(),
TagKeys: []tag.Key{ServiceID},
}
RcmgrBlockSvcPeerView = &view.View{
Measure: RcmgrBlockSvcPeer,
Aggregation: view.Count(),
TagKeys: []tag.Key{ServiceID, PeerID},
}
RcmgrAllowMemView = &view.View{
Measure: RcmgrAllowMem,
Aggregation: view.Count(),
}
RcmgrBlockMemView = &view.View{
Measure: RcmgrBlockMem,
Aggregation: view.Count(),
}
)
// DefaultViews is an array of OpenCensus views for metric gathering purposes
@ -517,6 +609,21 @@ var DefaultViews = func() []*view.View {
GraphsyncSendingTotalMemoryAllocatedView,
GraphsyncSendingTotalPendingAllocationsView,
GraphsyncSendingPeersPendingView,
RcmgrAllowConnView,
RcmgrBlockConnView,
RcmgrAllowStreamView,
RcmgrBlockStreamView,
RcmgrAllowPeerView,
RcmgrBlockPeerView,
RcmgrAllowProtoView,
RcmgrBlockProtoView,
RcmgrBlockProtoPeerView,
RcmgrAllowSvcView,
RcmgrBlockSvcView,
RcmgrBlockSvcPeerView,
RcmgrAllowMemView,
RcmgrBlockMemView,
}
views = append(views, blockstore.DefaultViews...)
views = append(views, rpcmetrics.DefaultViews...)

View File

@ -139,6 +139,7 @@ func DefaultStorageMiner() *StorageMiner {
AllowUnseal: true,
AllowReplicaUpdate: true,
AllowProveReplicaUpdate2: true,
AllowRegenSectorKey: true,
// Default to 10 - tcp should still be able to figure this out, and
// it's the ratio between 10gbit / 1gbit

View File

@ -11,9 +11,15 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/repo"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
)
func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
@ -43,6 +49,8 @@ func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceMan
// TODO: also set appropriate default limits for lotus protocols
libp2p.SetDefaultServiceLimits(limiter)
opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{}))
if os.Getenv("LOTUS_DEBUG_RCMGR") != "" {
debugPath := filepath.Join(repoPath, "debug")
if err := os.MkdirAll(debugPath, 0755); err != nil {
@ -70,3 +78,109 @@ func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts {
Opts: []libp2p.Option{libp2p.ResourceManager(mgr)},
}
}
type rcmgrMetrics struct{}
func (r rcmgrMetrics) AllowConn(dir network.Direction, usefd bool) {
ctx := context.Background()
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
}
if usefd {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false"))
}
stats.Record(ctx, metrics.RcmgrAllowConn.M(1))
}
func (r rcmgrMetrics) BlockConn(dir network.Direction, usefd bool) {
ctx := context.Background()
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
}
if usefd {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "true"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.UseFD, "false"))
}
stats.Record(ctx, metrics.RcmgrBlockConn.M(1))
}
func (r rcmgrMetrics) AllowStream(p peer.ID, dir network.Direction) {
ctx := context.Background()
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
}
stats.Record(ctx, metrics.RcmgrAllowStream.M(1))
}
func (r rcmgrMetrics) BlockStream(p peer.ID, dir network.Direction) {
ctx := context.Background()
if dir == network.DirInbound {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "inbound"))
} else {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Direction, "outbound"))
}
stats.Record(ctx, metrics.RcmgrBlockStream.M(1))
}
func (r rcmgrMetrics) AllowPeer(p peer.ID) {
ctx := context.Background()
stats.Record(ctx, metrics.RcmgrAllowPeer.M(1))
}
func (r rcmgrMetrics) BlockPeer(p peer.ID) {
ctx := context.Background()
stats.Record(ctx, metrics.RcmgrBlockPeer.M(1))
}
func (r rcmgrMetrics) AllowProtocol(proto protocol.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
stats.Record(ctx, metrics.RcmgrAllowProto.M(1))
}
func (r rcmgrMetrics) BlockProtocol(proto protocol.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
stats.Record(ctx, metrics.RcmgrBlockProto.M(1))
}
func (r rcmgrMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ProtocolID, string(proto)))
stats.Record(ctx, metrics.RcmgrBlockProtoPeer.M(1))
}
func (r rcmgrMetrics) AllowService(svc string) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
stats.Record(ctx, metrics.RcmgrAllowSvc.M(1))
}
func (r rcmgrMetrics) BlockService(svc string) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
stats.Record(ctx, metrics.RcmgrBlockSvc.M(1))
}
func (r rcmgrMetrics) BlockServicePeer(svc string, p peer.ID) {
ctx := context.Background()
ctx, _ = tag.New(ctx, tag.Upsert(metrics.ServiceID, svc))
stats.Record(ctx, metrics.RcmgrBlockSvcPeer.M(1))
}
func (r rcmgrMetrics) AllowMemory(size int) {
stats.Record(context.Background(), metrics.RcmgrAllowMem.M(1))
}
func (r rcmgrMetrics) BlockMemory(size int) {
stats.Record(context.Background(), metrics.RcmgrBlockMem.M(1))
}

View File

@ -473,8 +473,8 @@ github.com/filecoin-project/specs-actors/v7 v7.0.0-20211117170924-fd07a4c7dff9/g
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211222192039-c83bea50c402/go.mod h1:p6LIOFezA1rgRLMewbvdi3Pp6SAu+q9FtJ9CAleSjrE=
github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1 h1:FuDaXIbcw2hRsFI8SDTmsGGCE+NumpF6aiBoU/2X5W4=
github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M=
github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9 h1:oUYOvF7EvdXS0Zmk9mNkaB6Bu0l+WXBYPzVodKMiLug=
github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9/go.mod h1:Tb88Zq+IBJbvAn3mS89GYj3jdRThBTE/771HCVZdRJU=
github.com/filecoin-project/specs-storage v0.2.0 h1:Y4UDv0apRQ3zI2GiPPubi8JblpUZZphEdaJUxCutfyg=
github.com/filecoin-project/specs-storage v0.2.0/go.mod h1:Tb88Zq+IBJbvAn3mS89GYj3jdRThBTE/771HCVZdRJU=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=