Merge branch 'master' of github.com:protofire/lotus

This commit is contained in:
Ales Dumikau 2023-03-10 10:14:18 +03:00
commit 8a3f91e655
No known key found for this signature in database
GPG Key ID: 62AFC370B8532483
42 changed files with 694 additions and 160 deletions

View File

@ -401,6 +401,10 @@ func init() {
FromBlock: pstring("2301220"),
Address: []ethtypes.EthAddress{ethaddr},
})
percent := types.Percent(123)
addExample(percent)
addExample(&percent)
}
func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) {

View File

@ -115,6 +115,23 @@ type Config struct {
// A positive value is the number of compactions before a full GC is performed;
// a value of 1 will perform full GC in every compaction.
HotStoreFullGCFrequency uint64
// HotstoreMaxSpaceTarget suggests the max allowed space the hotstore can take.
// This is not a hard limit, it is possible for the hotstore to exceed the target
// for example if state grows massively between compactions. The splitstore
// will make a best effort to avoid overflowing the target and in practice should
// never overflow. This field is used when doing GC at the end of a compaction to
// adaptively choose moving GC
HotstoreMaxSpaceTarget uint64
// Moving GC will be triggered when total moving size exceeds
// HotstoreMaxSpaceTarget - HotstoreMaxSpaceThreshold
HotstoreMaxSpaceThreshold uint64
// Safety buffer to prevent moving GC from overflowing disk.
// Moving GC will not occur when total moving size exceeds
// HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer
HotstoreMaxSpaceSafetyBuffer uint64
}
// ChainAccessor allows the Splitstore to access the chain. It will most likely
@ -165,6 +182,7 @@ type SplitStore struct {
compactionIndex int64
pruneIndex int64
onlineGCCnt int64
ctx context.Context
cancel func()
@ -195,6 +213,17 @@ type SplitStore struct {
// registered protectors
protectors []func(func(cid.Cid) error) error
// dag sizes measured during latest compaction
// logged and used for GC strategy
// protected by compaction lock
szWalk int64
szProtectedTxns int64
szKeys int64 // approximate, not counting keys protected when entering critical section
// protected by txnLk
szMarkedLiveRefs int64
}
var _ bstore.Blockstore = (*SplitStore)(nil)

View File

@ -95,7 +95,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
}
defer visitor.Close() //nolint
err = s.walkChain(curTs, boundaryEpoch, boundaryEpoch, visitor,
size := s.walkChain(curTs, boundaryEpoch, boundaryEpoch, visitor,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
@ -133,7 +133,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
return err
}
log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt)
log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt, "walk size", size)
write("--")
write("cold: %d missing: %d", *coldCnt, *missingCnt)
write("DONE")

View File

@ -66,7 +66,8 @@ var (
)
const (
batchSize = 16384
batchSize = 16384
cidKeySize = 128
)
func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
@ -199,9 +200,11 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
log.Debugf("marking %d live refs", len(cids))
startMark := time.Now()
szMarked := new(int64)
count := new(int32)
visitor := newConcurrentVisitor()
walkObject := func(c cid.Cid) error {
walkObject := func(c cid.Cid) (int64, error) {
return s.walkObjectIncomplete(c, visitor,
func(c cid.Cid) error {
if isUnitaryObject(c) {
@ -228,10 +231,12 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
// optimize the common case of single put
if len(cids) == 1 {
if err := walkObject(cids[0]); err != nil {
sz, err := walkObject(cids[0])
if err != nil {
log.Errorf("error marking tipset refs: %s", err)
}
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count)
atomic.AddInt64(szMarked, sz)
return
}
@ -243,9 +248,11 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
worker := func() error {
for c := range workch {
if err := walkObject(c); err != nil {
sz, err := walkObject(c)
if err != nil {
return err
}
atomic.AddInt64(szMarked, sz)
}
return nil
@ -268,7 +275,8 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) {
log.Errorf("error marking tipset refs: %s", err)
}
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count)
log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count, "size marked", *szMarked)
s.szMarkedLiveRefs += atomic.LoadInt64(szMarked)
}
// transactionally protect a view
@ -361,6 +369,7 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
log.Infow("protecting transactional references", "refs", len(txnRefs))
count := 0
sz := new(int64)
workch := make(chan cid.Cid, len(txnRefs))
startProtect := time.Now()
@ -393,10 +402,11 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
worker := func() error {
for c := range workch {
err := s.doTxnProtect(c, markSet)
szTxn, err := s.doTxnProtect(c, markSet)
if err != nil {
return xerrors.Errorf("error protecting transactional references to %s: %w", c, err)
}
atomic.AddInt64(sz, szTxn)
}
return nil
}
@ -409,16 +419,16 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
if err := g.Wait(); err != nil {
return err
}
log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count)
s.szProtectedTxns += atomic.LoadInt64(sz)
log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count, "protected size", sz)
}
}
// transactionally protect a reference by walking the object and marking.
// concurrent markings are short circuited by checking the markset.
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) (int64, error) {
if err := s.checkClosing(); err != nil {
return err
return 0, err
}
// Note: cold objects are deleted heaviest first, so the consituents of an object
@ -509,6 +519,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
// might be potentially inconsistent; abort compaction and notify the user to intervene.
return xerrors.Errorf("checkpoint exists; aborting compaction")
}
s.clearSizeMeasurements()
currentEpoch := curTs.Height()
boundaryEpoch := currentEpoch - CompactionBoundary
@ -598,7 +609,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{}, fHot, fCold)
if err != nil {
return xerrors.Errorf("error marking: %w", err)
}
@ -638,7 +648,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
defer purgew.Close() //nolint:errcheck
// some stats for logging
var hotCnt, coldCnt, purgeCnt int
var hotCnt, coldCnt, purgeCnt int64
err = s.hot.ForEachKey(func(c cid.Cid) error {
// was it marked?
mark, err := markSet.Has(c)
@ -690,8 +700,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("cold collection done", "took", time.Since(startCollect))
log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "purge", purgeCnt)
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt)))
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt)))
s.szKeys = hotCnt * cidKeySize
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(hotCnt))
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(coldCnt))
if err := s.checkClosing(); err != nil {
return err
@ -773,8 +784,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error purging cold objects: %w", err)
}
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))
s.endCriticalSection()
log.Infow("critical section done", "total protected size", s.szProtectedTxns, "total marked live size", s.szMarkedLiveRefs)
if err := checkpoint.Close(); err != nil {
log.Warnf("error closing checkpoint: %s", err)
@ -907,6 +918,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
copy(toWalk, ts.Cids())
walkCnt := new(int64)
scanCnt := new(int64)
szWalk := new(int64)
tsRef := func(blkCids []cid.Cid) (cid.Cid, error) {
return types.NewTipSetKey(blkCids...).Cid()
@ -942,48 +954,64 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
if err != nil {
return xerrors.Errorf("error computing cid reference to parent tipset")
}
if err := s.walkObjectIncomplete(pRef, visitor, fHot, stopWalk); err != nil {
sz, err := s.walkObjectIncomplete(pRef, visitor, fHot, stopWalk)
if err != nil {
return xerrors.Errorf("error walking parent tipset cid reference")
}
atomic.AddInt64(szWalk, sz)
// message are retained if within the inclMsgs boundary
if hdr.Height >= inclMsgs && hdr.Height > 0 {
if inclMsgs < inclState {
// we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
if err := s.walkObjectIncomplete(hdr.Messages, visitor, fHot, stopWalk); err != nil {
sz, err := s.walkObjectIncomplete(hdr.Messages, visitor, fHot, stopWalk)
if err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
atomic.AddInt64(szWalk, sz)
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fHot, stopWalk); err != nil {
sz, err = s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fHot, stopWalk)
if err != nil {
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
}
atomic.AddInt64(szWalk, sz)
} else {
if err := s.walkObject(hdr.Messages, visitor, fHot); err != nil {
sz, err = s.walkObject(hdr.Messages, visitor, fHot)
if err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
atomic.AddInt64(szWalk, sz)
if err := s.walkObject(hdr.ParentMessageReceipts, visitor, fHot); err != nil {
sz, err := s.walkObject(hdr.ParentMessageReceipts, visitor, fHot)
if err != nil {
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
}
atomic.AddInt64(szWalk, sz)
}
}
// messages and receipts outside of inclMsgs are included in the cold store
if hdr.Height < inclMsgs && hdr.Height > 0 {
if err := s.walkObjectIncomplete(hdr.Messages, visitor, fCold, stopWalk); err != nil {
sz, err := s.walkObjectIncomplete(hdr.Messages, visitor, fCold, stopWalk)
if err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fCold, stopWalk); err != nil {
atomic.AddInt64(szWalk, sz)
sz, err = s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fCold, stopWalk)
if err != nil {
return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
}
atomic.AddInt64(szWalk, sz)
}
// state is only retained if within the inclState boundary, with the exception of genesis
if hdr.Height >= inclState || hdr.Height == 0 {
if err := s.walkObject(hdr.ParentStateRoot, visitor, fHot); err != nil {
sz, err := s.walkObject(hdr.ParentStateRoot, visitor, fHot)
if err != nil {
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
}
atomic.AddInt64(szWalk, sz)
atomic.AddInt64(scanCnt, 1)
}
@ -1001,9 +1029,11 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
if err != nil {
return xerrors.Errorf("error computing cid reference to parent tipset")
}
if err := s.walkObjectIncomplete(hRef, visitor, fHot, stopWalk); err != nil {
sz, err := s.walkObjectIncomplete(hRef, visitor, fHot, stopWalk)
if err != nil {
return xerrors.Errorf("error walking parent tipset cid reference")
}
atomic.AddInt64(szWalk, sz)
for len(toWalk) > 0 {
// walking can take a while, so check this with every opportunity
@ -1047,123 +1077,129 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp
}
}
log.Infow("chain walk done", "walked", *walkCnt, "scanned", *scanCnt)
log.Infow("chain walk done", "walked", *walkCnt, "scanned", *scanCnt, "walk size", szWalk)
s.szWalk = atomic.LoadInt64(szWalk)
return nil
}
func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid) error) error {
func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid) error) (int64, error) {
var sz int64
visit, err := visitor.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
return 0, xerrors.Errorf("error visiting object: %w", err)
}
if !visit {
return nil
return sz, nil
}
if err := f(c); err != nil {
if err == errStopWalk {
return nil
return sz, nil
}
return err
return 0, err
}
if c.Prefix().Codec != cid.DagCBOR {
return nil
return sz, nil
}
// check this before recursing
if err := s.checkClosing(); err != nil {
return err
return 0, err
}
var links []cid.Cid
err = s.view(c, func(data []byte) error {
sz += int64(len(data))
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
links = append(links, c)
})
})
if err != nil {
return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err)
return 0, xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err)
}
for _, c := range links {
err := s.walkObject(c, visitor, f)
szLink, err := s.walkObject(c, visitor, f)
if err != nil {
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
return 0, xerrors.Errorf("error walking link (cid: %s): %w", c, err)
}
sz += szLink
}
return nil
return sz, nil
}
// like walkObject, but the object may be potentially incomplete (references missing)
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, missing func(cid.Cid) error) error {
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, missing func(cid.Cid) error) (int64, error) {
var sz int64
visit, err := visitor.Visit(c)
if err != nil {
return xerrors.Errorf("error visiting object: %w", err)
return 0, xerrors.Errorf("error visiting object: %w", err)
}
if !visit {
return nil
return sz, nil
}
// occurs check -- only for DAGs
if c.Prefix().Codec == cid.DagCBOR {
has, err := s.has(c)
if err != nil {
return xerrors.Errorf("error occur checking %s: %w", c, err)
return 0, xerrors.Errorf("error occur checking %s: %w", c, err)
}
if !has {
err = missing(c)
if err == errStopWalk {
return nil
return sz, nil
}
return err
return 0, err
}
}
if err := f(c); err != nil {
if err == errStopWalk {
return nil
return sz, nil
}
return err
return 0, err
}
if c.Prefix().Codec != cid.DagCBOR {
return nil
return sz, nil
}
// check this before recursing
if err := s.checkClosing(); err != nil {
return err
return sz, err
}
var links []cid.Cid
err = s.view(c, func(data []byte) error {
sz += int64(len(data))
return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) {
links = append(links, c)
})
})
if err != nil {
return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err)
return 0, xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err)
}
for _, c := range links {
err := s.walkObjectIncomplete(c, visitor, f, missing)
szLink, err := s.walkObjectIncomplete(c, visitor, f, missing)
if err != nil {
return xerrors.Errorf("error walking link (cid: %s): %w", c, err)
return 0, xerrors.Errorf("error walking link (cid: %s): %w", c, err)
}
sz += szLink
}
return nil
return sz, nil
}
// internal version used during compaction and related operations
@ -1429,8 +1465,9 @@ func (s *SplitStore) completeCompaction() error {
}
s.compactType = none
// Note: at this point we can start the splitstore; a compaction should run on
// the first head change, which will trigger gc on the hotstore.
// Note: at this point we can start the splitstore; base epoch is not
// incremented here so a compaction should run on the first head
// change, which will trigger gc on the hotstore.
// We don't mind the second (back-to-back) compaction as the head will
// have advanced during marking and coldset accumulation.
return nil
@ -1488,6 +1525,13 @@ func (s *SplitStore) completePurge(coldr *ColdSetReader, checkpoint *Checkpoint,
return nil
}
func (s *SplitStore) clearSizeMeasurements() {
s.szKeys = 0
s.szMarkedLiveRefs = 0
s.szProtectedTxns = 0
s.szWalk = 0
}
// I really don't like having this code, but we seem to have some occasional DAG references with
// missing constituents. During testing in mainnet *some* of these references *sometimes* appeared
// after a little bit.
@ -1528,7 +1572,7 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
missing = make(map[cid.Cid]struct{})
for c := range towalk {
err := s.walkObjectIncomplete(c, visitor,
_, err := s.walkObjectIncomplete(c, visitor,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk

View File

@ -7,15 +7,61 @@ import (
bstore "github.com/filecoin-project/lotus/blockstore"
)
const (
// Fraction of garbage in badger vlog for online GC traversal to collect garbage
AggressiveOnlineGCThreshold = 0.0001
)
func (s *SplitStore) gcHotAfterCompaction() {
// Measure hotstore size, determine if we should do full GC, determine if we can do full GC.
// We should do full GC if
// FullGCFrequency is specified and compaction index matches frequency
// OR HotstoreMaxSpaceTarget is specified and total moving space is within 150 GB of target
// We can do full if
// HotstoreMaxSpaceTarget is not specified
// OR total moving space would not exceed 50 GB below target
//
// a) If we should not do full GC => online GC
// b) If we should do full GC and can => moving GC
// c) If we should do full GC and can't => aggressive online GC
getSize := func() int64 {
sizer, ok := s.hot.(bstore.BlockstoreSize)
if ok {
size, err := sizer.Size()
if err != nil {
log.Warnf("error getting hotstore size: %s, estimating empty hot store for targeting", err)
return 0
}
return size
}
log.Errorf("Could not measure hotstore size, assuming it is 0 bytes, which it is not")
return 0
}
hotSize := getSize()
copySizeApprox := s.szKeys + s.szMarkedLiveRefs + s.szProtectedTxns + s.szWalk
shouldTarget := s.cfg.HotstoreMaxSpaceTarget > 0 && hotSize+copySizeApprox > int64(s.cfg.HotstoreMaxSpaceTarget)-int64(s.cfg.HotstoreMaxSpaceThreshold)
shouldFreq := s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0
shouldDoFull := shouldTarget || shouldFreq
canDoFull := s.cfg.HotstoreMaxSpaceTarget == 0 || hotSize+copySizeApprox < int64(s.cfg.HotstoreMaxSpaceTarget)-int64(s.cfg.HotstoreMaxSpaceSafetyBuffer)
log.Debugw("approximating new hot store size", "key size", s.szKeys, "marked live refs", s.szMarkedLiveRefs, "protected txns", s.szProtectedTxns, "walked DAG", s.szWalk)
log.Infof("measured hot store size: %d, approximate new size: %d, should do full %t, can do full %t", hotSize, copySizeApprox, shouldDoFull, canDoFull)
var opts []bstore.BlockstoreGCOption
if s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 {
if shouldDoFull && canDoFull {
opts = append(opts, bstore.WithFullGC(true))
} else if shouldDoFull && !canDoFull {
log.Warnf("Attention! Estimated moving GC size %d is not within safety buffer %d of target max %d, performing aggressive online GC to attempt to bring hotstore size down safely", copySizeApprox, s.cfg.HotstoreMaxSpaceSafetyBuffer, s.cfg.HotstoreMaxSpaceTarget)
log.Warn("If problem continues you can 1) temporarily allocate more disk space to hotstore and 2) reflect in HotstoreMaxSpaceTarget OR trigger manual move with `lotus chain prune hot-moving`")
log.Warn("If problem continues and you do not have any more disk space you can run continue to manually trigger online GC at aggressive thresholds (< 0.01) with `lotus chain prune hot`")
opts = append(opts, bstore.WithThreshold(AggressiveOnlineGCThreshold))
}
if err := s.gcBlockstore(s.hot, opts); err != nil {
log.Warnf("error garbage collecting hostore: %s", err)
}
log.Infof("measured hot store size after GC: %d", getSize())
}
func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error {

View File

@ -101,7 +101,7 @@ func (s *SplitStore) doReify(c cid.Cid) {
defer s.txnLk.RUnlock()
count := 0
err := s.walkObjectIncomplete(c, newTmpVisitor(),
_, err := s.walkObjectIncomplete(c, newTmpVisitor(),
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk

Binary file not shown.

View File

@ -256,7 +256,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS
//return nil, xerrors.Errorf("creating drand beacon: %w", err)
//}
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), sys, us, beac)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), sys, us, beac, ds)
if err != nil {
return nil, xerrors.Errorf("initing stmgr: %w", err)
}

View File

@ -13,7 +13,11 @@ import (
)
var (
ReplaceByFeeRatioDefault = 1.25
ReplaceByFeePercentageMinimum types.Percent = 110
ReplaceByFeePercentageDefault types.Percent = 125
)
var (
MemPoolSizeLimitHiDefault = 30000
MemPoolSizeLimitLoDefault = 20000
PruneCooldownDefault = time.Minute
@ -60,9 +64,9 @@ func (mp *MessagePool) getConfig() *types.MpoolConfig {
}
func validateConfg(cfg *types.MpoolConfig) error {
if cfg.ReplaceByFeeRatio < ReplaceByFeeRatioDefault {
return fmt.Errorf("'ReplaceByFeeRatio' is less than required %f < %f",
cfg.ReplaceByFeeRatio, ReplaceByFeeRatioDefault)
if cfg.ReplaceByFeeRatio < ReplaceByFeePercentageMinimum {
return fmt.Errorf("'ReplaceByFeeRatio' is less than required %s < %s",
cfg.ReplaceByFeeRatio, ReplaceByFeePercentageMinimum)
}
if cfg.GasLimitOverestimation < 1 {
return fmt.Errorf("'GasLimitOverestimation' cannot be less than 1")
@ -91,7 +95,7 @@ func DefaultConfig() *types.MpoolConfig {
return &types.MpoolConfig{
SizeLimitHigh: MemPoolSizeLimitHiDefault,
SizeLimitLow: MemPoolSizeLimitLoDefault,
ReplaceByFeeRatio: ReplaceByFeeRatioDefault,
ReplaceByFeeRatio: ReplaceByFeePercentageDefault,
PruneCooldown: PruneCooldownDefault,
GasLimitOverestimation: GasLimitOverestimation,
}

View File

@ -47,10 +47,8 @@ var log = logging.Logger("messagepool")
var futureDebug = false
var rbfNumBig = types.NewInt(uint64((ReplaceByFeeRatioDefault - 1) * RbfDenom))
var rbfDenomBig = types.NewInt(RbfDenom)
const RbfDenom = 256
var rbfNumBig = types.NewInt(uint64(ReplaceByFeePercentageMinimum))
var rbfDenomBig = types.NewInt(100)
var RepublishInterval = time.Duration(10*build.BlockDelaySecs+build.PropagationDelaySecs) * time.Second
@ -197,7 +195,13 @@ func newMsgSet(nonce uint64) *msgSet {
}
func ComputeMinRBF(curPrem abi.TokenAmount) abi.TokenAmount {
minPrice := types.BigAdd(curPrem, types.BigDiv(types.BigMul(curPrem, rbfNumBig), rbfDenomBig))
minPrice := types.BigDiv(types.BigMul(curPrem, rbfNumBig), rbfDenomBig)
return types.BigAdd(minPrice, types.NewInt(1))
}
func ComputeRBF(curPrem abi.TokenAmount, replaceByFeeRatio types.Percent) abi.TokenAmount {
rbfNumBig := types.NewInt(uint64(replaceByFeeRatio))
minPrice := types.BigDiv(types.BigMul(curPrem, rbfNumBig), rbfDenomBig)
return types.BigAdd(minPrice, types.NewInt(1))
}

View File

@ -174,9 +174,16 @@ func (us UpgradeSchedule) GetNtwkVersion(e abi.ChainEpoch) (network.Version, err
func (sm *StateManager) HandleStateForks(ctx context.Context, root cid.Cid, height abi.ChainEpoch, cb ExecMonitor, ts *types.TipSet) (cid.Cid, error) {
retCid := root
var err error
u := sm.stateMigrations[height]
if u != nil && u.upgrade != nil {
migCid, ok, err := u.migrationResultCache.Get(ctx, root)
if err == nil && ok {
log.Infow("CACHED migration", "height", height, "from", root, "to", migCid)
return migCid, nil
} else if err != nil {
log.Errorw("failed to lookup previous migration result", "err", err)
}
startTime := time.Now()
log.Warnw("STARTING migration", "height", height, "from", root)
// Yes, we clone the cache, even for the final upgrade epoch. Why? Reverts. We may
@ -197,6 +204,11 @@ func (sm *StateManager) HandleStateForks(ctx context.Context, root cid.Cid, heig
"to", retCid,
"duration", time.Since(startTime),
)
// Only set if migration ran, we do not want a root => root mapping
if err := u.migrationResultCache.Store(ctx, root, retCid); err != nil {
log.Errorw("failed to store migration result", "err", err)
}
}
return retCid, nil

View File

@ -10,6 +10,7 @@ import (
"testing"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
ipldcbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
@ -35,6 +36,7 @@ import (
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/stmgr"
. "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
@ -166,7 +168,7 @@ func TestForkHeightTriggers(t *testing.T) {
}
return st.Flush(ctx)
}}}, cg.BeaconSchedule())
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore())
if err != nil {
t.Fatal(err)
}
@ -284,7 +286,7 @@ func testForkRefuseCall(t *testing.T, nullsBefore, nullsAfter int) {
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
migrationCount++
return root, nil
}}}, cg.BeaconSchedule())
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore())
if err != nil {
t.Fatal(err)
}
@ -502,7 +504,7 @@ func TestForkPreMigration(t *testing.T) {
return nil
},
}}},
}, cg.BeaconSchedule())
}, cg.BeaconSchedule(), datastore.NewMapDatastore())
if err != nil {
t.Fatal(err)
}
@ -576,6 +578,7 @@ func TestDisablePreMigration(t *testing.T) {
}}},
},
cg.BeaconSchedule(),
datastore.NewMapDatastore(),
)
require.NoError(t, err)
require.NoError(t, sm.Start(context.Background()))
@ -603,3 +606,102 @@ func TestDisablePreMigration(t *testing.T) {
require.Equal(t, 1, len(counter))
}
func TestMigrtionCache(t *testing.T) {
logging.SetAllLoggers(logging.LevelInfo)
cg, err := gen.NewGenerator()
require.NoError(t, err)
counter := make(chan struct{}, 10)
metadataDs := datastore.NewMapDatastore()
sm, err := NewStateManager(
cg.ChainStore(),
consensus.NewTipSetExecutor(filcns.RewardFunc),
cg.StateManager().VMSys(),
UpgradeSchedule{{
Network: network.Version1,
Height: testForkHeight,
Migration: func(_ context.Context, _ *StateManager, _ MigrationCache, _ ExecMonitor,
root cid.Cid, _ abi.ChainEpoch, _ *types.TipSet) (cid.Cid, error) {
counter <- struct{}{}
return root, nil
}},
},
cg.BeaconSchedule(),
metadataDs,
)
require.NoError(t, err)
require.NoError(t, sm.Start(context.Background()))
defer func() {
require.NoError(t, sm.Stop(context.Background()))
}()
inv := consensus.NewActorRegistry()
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
inv.Register(actorstypes.Version0, nil, registry)
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
nvm, err := vm.NewLegacyVM(ctx, vmopt)
require.NoError(t, err)
nvm.SetInvoker(inv)
return nvm, nil
})
cg.SetStateManager(sm)
for i := 0; i < 50; i++ {
_, err := cg.NextTipSet()
require.NoError(t, err)
}
ts, err := cg.ChainStore().GetTipsetByHeight(context.Background(), testForkHeight, nil, false)
require.NoError(t, err)
root, _, err := stmgr.ComputeState(context.Background(), sm, testForkHeight+1, []*types.Message{}, ts)
require.NoError(t, err)
t.Log(root)
require.Equal(t, 1, len(counter))
{
sm, err := NewStateManager(
cg.ChainStore(),
consensus.NewTipSetExecutor(filcns.RewardFunc),
cg.StateManager().VMSys(),
UpgradeSchedule{{
Network: network.Version1,
Height: testForkHeight,
Migration: func(_ context.Context, _ *StateManager, _ MigrationCache, _ ExecMonitor,
root cid.Cid, _ abi.ChainEpoch, _ *types.TipSet) (cid.Cid, error) {
counter <- struct{}{}
return root, nil
}},
},
cg.BeaconSchedule(),
metadataDs,
)
require.NoError(t, err)
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
nvm, err := vm.NewLegacyVM(ctx, vmopt)
require.NoError(t, err)
nvm.SetInvoker(inv)
return nvm, nil
})
ctx := context.Background()
base, _, err := sm.ExecutionTrace(ctx, ts)
require.NoError(t, err)
_, err = sm.HandleStateForks(context.Background(), base, ts.Height(), nil, ts)
require.NoError(t, err)
// Should not have increased as we should be using the cached results in the metadataDs
require.Equal(t, 1, len(counter))
}
}

View File

@ -2,10 +2,13 @@ package stmgr
import (
"context"
"fmt"
"sync"
"github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore"
cbor "github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
@ -51,9 +54,47 @@ type versionSpec struct {
}
type migration struct {
upgrade MigrationFunc
preMigrations []PreMigration
cache *nv16.MemMigrationCache
upgrade MigrationFunc
preMigrations []PreMigration
cache *nv16.MemMigrationCache
migrationResultCache *migrationResultCache
}
type migrationResultCache struct {
ds dstore.Batching
keyPrefix string
}
func (m *migrationResultCache) keyForMigration(root cid.Cid) dstore.Key {
kStr := fmt.Sprintf("%s/%s", m.keyPrefix, root)
return dstore.NewKey(kStr)
}
func (m *migrationResultCache) Get(ctx context.Context, root cid.Cid) (cid.Cid, bool, error) {
k := m.keyForMigration(root)
bs, err := m.ds.Get(ctx, k)
if ipld.IsNotFound(err) {
return cid.Undef, false, nil
} else if err != nil {
return cid.Undef, false, xerrors.Errorf("error loading migration result: %w", err)
}
c, err := cid.Parse(bs)
if err != nil {
return cid.Undef, false, xerrors.Errorf("error parsing migration result: %w", err)
}
return c, true, nil
}
func (m *migrationResultCache) Store(ctx context.Context, root cid.Cid, resultCid cid.Cid) error {
k := m.keyForMigration(root)
if err := m.ds.Put(ctx, k, resultCid.Bytes()); err != nil {
return err
}
return nil
}
type Executor interface {
@ -103,7 +144,7 @@ type treeCache struct {
tree *state.StateTree
}
func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, beacon beacon.Schedule) (*StateManager, error) {
func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, beacon beacon.Schedule, metadataDs dstore.Batching) (*StateManager, error) {
// If we have upgrades, make sure they're in-order and make sense.
if err := us.Validate(); err != nil {
return nil, err
@ -122,12 +163,18 @@ func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder,
upgrade: upgrade.Migration,
preMigrations: upgrade.PreMigrations,
cache: nv16.NewMemMigrationCache(),
migrationResultCache: &migrationResultCache{
keyPrefix: fmt.Sprintf("/migration-cache/nv%d", upgrade.Network),
ds: metadataDs,
},
}
stateMigrations[upgrade.Height] = migration
}
if upgrade.Expensive {
expensiveUpgrades[upgrade.Height] = struct{}{}
}
networkVersions = append(networkVersions, versionSpec{
networkVersion: lastVersion,
atOrBelow: upgrade.Height,
@ -155,8 +202,8 @@ func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder,
}, nil
}
func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, b beacon.Schedule, em ExecMonitor) (*StateManager, error) {
sm, err := NewStateManager(cs, exec, sys, us, b)
func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, b beacon.Schedule, em ExecMonitor, metadataDs dstore.Batching) (*StateManager, error) {
sm, err := NewStateManager(cs, exec, sys, us, b, metadataDs)
if err != nil {
return nil, err
}

View File

@ -4,8 +4,8 @@ import (
"context"
"os"
"strconv"
"sync"
lru "github.com/hashicorp/golang-lru"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
@ -13,7 +13,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
var DefaultChainIndexCacheSize = 32 << 10
var DefaultChainIndexCacheSize = 32 << 15
func init() {
if s := os.Getenv("LOTUS_CHAIN_INDEX_CACHE"); s != "" {
@ -27,7 +27,8 @@ func init() {
}
type ChainIndex struct {
skipCache *lru.ARCCache
indexCacheLk sync.Mutex
indexCache map[types.TipSetKey]*lbEntry
loadTipSet loadTipSetFunc
@ -36,17 +37,14 @@ type ChainIndex struct {
type loadTipSetFunc func(context.Context, types.TipSetKey) (*types.TipSet, error)
func NewChainIndex(lts loadTipSetFunc) *ChainIndex {
sc, _ := lru.NewARC(DefaultChainIndexCacheSize)
return &ChainIndex{
skipCache: sc,
indexCache: make(map[types.TipSetKey]*lbEntry, DefaultChainIndexCacheSize),
loadTipSet: lts,
skipLength: 20,
}
}
type lbEntry struct {
ts *types.TipSet
parentHeight abi.ChainEpoch
targetHeight abi.ChainEpoch
target types.TipSetKey
}
@ -58,25 +56,36 @@ func (ci *ChainIndex) GetTipsetByHeight(ctx context.Context, from *types.TipSet,
rounded, err := ci.roundDown(ctx, from)
if err != nil {
return nil, err
return nil, xerrors.Errorf("failed to round down: %w", err)
}
ci.indexCacheLk.Lock()
defer ci.indexCacheLk.Unlock()
cur := rounded.Key()
for {
cval, ok := ci.skipCache.Get(cur)
lbe, ok := ci.indexCache[cur]
if !ok {
fc, err := ci.fillCache(ctx, cur)
if err != nil {
return nil, err
return nil, xerrors.Errorf("failed to fill cache: %w", err)
}
cval = fc
lbe = fc
}
lbe := cval.(*lbEntry)
if lbe.ts.Height() == to || lbe.parentHeight < to {
return lbe.ts, nil
} else if to > lbe.targetHeight {
return ci.walkBack(ctx, lbe.ts, to)
if to == lbe.targetHeight {
ts, err := ci.loadTipSet(ctx, lbe.target)
if err != nil {
return nil, xerrors.Errorf("failed to load tipset: %w", err)
}
return ts, nil
}
if to > lbe.targetHeight {
ts, err := ci.loadTipSet(ctx, cur)
if err != nil {
return nil, xerrors.Errorf("failed to load tipset: %w", err)
}
return ci.walkBack(ctx, ts, to)
}
cur = lbe.target
@ -87,16 +96,17 @@ func (ci *ChainIndex) GetTipsetByHeightWithoutCache(ctx context.Context, from *t
return ci.walkBack(ctx, from, to)
}
// Caller must hold indexCacheLk
func (ci *ChainIndex) fillCache(ctx context.Context, tsk types.TipSetKey) (*lbEntry, error) {
ts, err := ci.loadTipSet(ctx, tsk)
if err != nil {
return nil, err
return nil, xerrors.Errorf("failed to load tipset: %w", err)
}
if ts.Height() == 0 {
return &lbEntry{
ts: ts,
parentHeight: 0,
targetHeight: 0,
target: tsk,
}, nil
}
@ -124,12 +134,10 @@ func (ci *ChainIndex) fillCache(ctx context.Context, tsk types.TipSetKey) (*lbEn
}
lbe := &lbEntry{
ts: ts,
parentHeight: parent.Height(),
targetHeight: skipTarget.Height(),
target: skipTarget.Key(),
}
ci.skipCache.Add(tsk, lbe)
ci.indexCache[tsk] = lbe
return lbe, nil
}
@ -144,7 +152,7 @@ func (ci *ChainIndex) roundDown(ctx context.Context, ts *types.TipSet) (*types.T
rounded, err := ci.walkBack(ctx, ts, target)
if err != nil {
return nil, err
return nil, xerrors.Errorf("failed to walk back: %w", err)
}
return rounded, nil
@ -164,7 +172,7 @@ func (ci *ChainIndex) walkBack(ctx context.Context, from *types.TipSet, to abi.C
for {
pts, err := ci.loadTipSet(ctx, ts.Parents())
if err != nil {
return nil, err
return nil, xerrors.Errorf("failed to load tipset: %w", err)
}
if to > pts.Height() {

View File

@ -196,7 +196,8 @@ func TestChainExportImportFull(t *testing.T) {
}
nbs := blockstore.NewMemorySync()
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), filcns.Weight, nil)
ds := datastore.NewMapDatastore()
cs := store.NewChainStore(nbs, nbs, ds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck
root, err := cs.Import(context.TODO(), buf)
@ -213,7 +214,7 @@ func TestChainExportImportFull(t *testing.T) {
t.Fatal("imported chain differed from exported chain")
}
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), nil, filcns.DefaultUpgradeSchedule(), cg.BeaconSchedule())
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), nil, filcns.DefaultUpgradeSchedule(), cg.BeaconSchedule(), ds)
if err != nil {
t.Fatal(err)
}

View File

@ -10,7 +10,7 @@ type MpoolConfig struct {
PriorityAddrs []address.Address
SizeLimitHigh int
SizeLimitLow int
ReplaceByFeeRatio float64
ReplaceByFeeRatio Percent
PruneCooldown time.Duration
GasLimitOverestimation float64
}

39
chain/types/percent.go Normal file
View File

@ -0,0 +1,39 @@
package types
import (
"fmt"
"math"
"strconv"
"golang.org/x/xerrors"
)
// Percent stores a signed percentage as an int64. When converted to a string (or json), it's stored
// as a decimal with two places (e.g., 100% -> 1.00).
type Percent int64
func (p Percent) String() string {
abs := p
sign := ""
if abs < 0 {
abs = -abs
sign = "-"
}
return fmt.Sprintf(`%s%d.%d`, sign, abs/100, abs%100)
}
func (p Percent) MarshalJSON() ([]byte, error) {
return []byte(p.String()), nil
}
func (p *Percent) UnmarshalJSON(b []byte) error {
flt, err := strconv.ParseFloat(string(b)+"e2", 64)
if err != nil {
return xerrors.Errorf("unable to parse ratio %s: %w", string(b), err)
}
if math.Trunc(flt) != flt {
return xerrors.Errorf("ratio may only have two decimals: %s", string(b))
}
*p = Percent(flt)
return nil
}

View File

@ -0,0 +1,34 @@
package types
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
func TestPercent(t *testing.T) {
for _, tc := range []struct {
p Percent
s string
}{
{100, "1.0"},
{111, "1.11"},
{12, "0.12"},
{-12, "-0.12"},
{1012, "10.12"},
{-1012, "-10.12"},
{0, "0.0"},
} {
tc := tc
t.Run(fmt.Sprintf("%d <> %s", tc.p, tc.s), func(t *testing.T) {
m, err := tc.p.MarshalJSON()
require.NoError(t, err)
require.Equal(t, tc.s, string(m))
var p Percent
require.NoError(t, p.UnmarshalJSON([]byte(tc.s)))
require.Equal(t, tc.p, p)
})
}
}

View File

@ -461,7 +461,12 @@ var MpoolReplaceCmd = &cli.Command{
msg := found.Message
if cctx.Bool("auto") {
minRBF := messagepool.ComputeMinRBF(msg.GasPremium)
cfg, err := api.MpoolGetConfig(ctx)
if err != nil {
return xerrors.Errorf("failed to lookup the message pool config: %w", err)
}
defaultRBF := messagepool.ComputeRBF(msg.GasPremium, cfg.ReplaceByFeeRatio)
var mss *lapi.MessageSendSpec
if cctx.IsSet("fee-limit") {
@ -482,7 +487,7 @@ var MpoolReplaceCmd = &cli.Command{
return xerrors.Errorf("failed to estimate gas values: %w", err)
}
msg.GasPremium = big.Max(retm.GasPremium, minRBF)
msg.GasPremium = big.Max(retm.GasPremium, defaultRBF)
msg.GasFeeCap = big.Max(retm.GasFeeCap, msg.GasPremium)
mff := func() (abi.TokenAmount, error) {

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
"github.com/filecoin-project/lotus/chain/wallet"
@ -298,6 +299,7 @@ func TestReplace(t *testing.T) {
mockApi.EXPECT().ChainGetMessage(ctx, sm.Cid()).Return(&sm.Message, nil),
mockApi.EXPECT().ChainHead(ctx).Return(nil, nil),
mockApi.EXPECT().MpoolPending(ctx, types.EmptyTSK).Return([]*types.SignedMessage{sm}, nil),
mockApi.EXPECT().MpoolGetConfig(ctx).Return(messagepool.DefaultConfig(), nil),
// use gomock.any to match the message in expected api calls
// since the replace function modifies the message between calls, it would be pointless to try to match the exact argument
mockApi.EXPECT().GasEstimateMessageGas(ctx, gomock.Any(), &mss, types.EmptyTSK).Return(&sm.Message, nil),
@ -342,6 +344,7 @@ func TestReplace(t *testing.T) {
gomock.InOrder(
mockApi.EXPECT().ChainHead(ctx).Return(nil, nil),
mockApi.EXPECT().MpoolPending(ctx, types.EmptyTSK).Return([]*types.SignedMessage{sm}, nil),
mockApi.EXPECT().MpoolGetConfig(ctx).Return(messagepool.DefaultConfig(), nil),
// use gomock.any to match the message in expected api calls
// since the replace function modifies the message between calls, it would be pointless to try to match the exact argument
mockApi.EXPECT().GasEstimateMessageGas(ctx, gomock.Any(), &mss, types.EmptyTSK).Return(&sm.Message, nil),
@ -538,7 +541,7 @@ func TestConfig(t *testing.T) {
t.Fatal(err)
}
mpoolCfg := &types.MpoolConfig{PriorityAddrs: []address.Address{senderAddr}, SizeLimitHigh: 1234567, SizeLimitLow: 6, ReplaceByFeeRatio: 0.25}
mpoolCfg := &types.MpoolConfig{PriorityAddrs: []address.Address{senderAddr}, SizeLimitHigh: 1234567, SizeLimitLow: 6, ReplaceByFeeRatio: types.Percent(25)}
gomock.InOrder(
mockApi.EXPECT().MpoolGetConfig(ctx).Return(mpoolCfg, nil),
)
@ -566,7 +569,7 @@ func TestConfig(t *testing.T) {
t.Fatal(err)
}
mpoolCfg := &types.MpoolConfig{PriorityAddrs: []address.Address{senderAddr}, SizeLimitHigh: 234567, SizeLimitLow: 3, ReplaceByFeeRatio: 0.33}
mpoolCfg := &types.MpoolConfig{PriorityAddrs: []address.Address{senderAddr}, SizeLimitHigh: 234567, SizeLimitLow: 3, ReplaceByFeeRatio: types.Percent(33)}
gomock.InOrder(
mockApi.EXPECT().MpoolSetConfig(ctx, mpoolCfg).Return(nil),
)

View File

@ -229,7 +229,7 @@ var importBenchCmd = &cli.Command{
defer cs.Close() //nolint:errcheck
// TODO: We need to supply the actual beacon after v14
stm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(verifier), filcns.DefaultUpgradeSchedule(), nil)
stm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(verifier), filcns.DefaultUpgradeSchedule(), nil, metadataDs)
if err != nil {
return err
}

View File

@ -513,7 +513,7 @@ var chainBalanceStateCmd = &cli.Command{
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}
@ -737,7 +737,7 @@ var chainPledgeCmd = &cli.Command{
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}

View File

@ -111,7 +111,7 @@ var gasTraceCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd, mds)
if err != nil {
return err
}
@ -212,7 +212,7 @@ var replayOfflineCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd, mds)
if err != nil {
return err
}

View File

@ -90,7 +90,7 @@ var invariantsCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/urfave/cli/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
@ -121,7 +122,8 @@ var migrationsCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
// Note: we use a map datastore for the metadata to avoid writing / using cached migration results in the metadata store
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, datastore.NewMapDatastore())
if err != nil {
return err
}

View File

@ -308,7 +308,7 @@ to reduce the number of decode operations performed by caching the decoded objec
}
tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc)
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}

View File

@ -106,7 +106,7 @@ func (nd *Node) LoadSim(ctx context.Context, name string) (*Simulation, error) {
if err != nil {
return nil, xerrors.Errorf("failed to create upgrade schedule for simulation %s: %w", name, err)
}
sim.StateManager, err = stmgr.NewStateManager(nd.Chainstore, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(mock.Verifier), us, nil)
sim.StateManager, err = stmgr.NewStateManager(nd.Chainstore, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(mock.Verifier), us, nil, nd.MetadataDS)
if err != nil {
return nil, xerrors.Errorf("failed to create state manager for simulation %s: %w", name, err)
}
@ -125,7 +125,7 @@ func (nd *Node) CreateSim(ctx context.Context, name string, head *types.TipSet)
if err != nil {
return nil, err
}
sm, err := stmgr.NewStateManager(nd.Chainstore, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(mock.Verifier), filcns.DefaultUpgradeSchedule(), nil)
sm, err := stmgr.NewStateManager(nd.Chainstore, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(mock.Verifier), filcns.DefaultUpgradeSchedule(), nil, nd.MetadataDS)
if err != nil {
return nil, xerrors.Errorf("creating state manager: %w", err)
}

View File

@ -201,7 +201,7 @@ func (sim *Simulation) SetUpgradeHeight(nv network.Version, epoch abi.ChainEpoch
if err != nil {
return err
}
sm, err := stmgr.NewStateManager(sim.Node.Chainstore, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(mock.Verifier), newUpgradeSchedule, nil)
sm, err := stmgr.NewStateManager(sim.Node.Chainstore, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(mock.Verifier), newUpgradeSchedule, nil, sim.Node.MetadataDS)
if err != nil {
return err
}

View File

@ -540,7 +540,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
}
// TODO: We need to supply the actual beacon after v14
stm, err := stmgr.NewStateManager(cst, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil)
stm, err := stmgr.NewStateManager(cst, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
if err != nil {
return err
}

View File

@ -108,7 +108,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, params
cs = store.NewChainStore(bs, bs, ds, filcns.Weight, nil)
tse = consensus.NewTipSetExecutor(filcns.RewardFunc)
sm, err = stmgr.NewStateManager(cs, tse, syscalls, filcns.DefaultUpgradeSchedule(), nil)
sm, err = stmgr.NewStateManager(cs, tse, syscalls, filcns.DefaultUpgradeSchedule(), nil, ds)
)
if err != nil {
return nil, err

View File

@ -2879,7 +2879,7 @@ Response:
],
"SizeLimitHigh": 123,
"SizeLimitLow": 123,
"ReplaceByFeeRatio": 12.3,
"ReplaceByFeeRatio": 1.23,
"PruneCooldown": 60000000000,
"GasLimitOverestimation": 12.3
}
@ -3167,7 +3167,7 @@ Inputs:
],
"SizeLimitHigh": 123,
"SizeLimitLow": 123,
"ReplaceByFeeRatio": 12.3,
"ReplaceByFeeRatio": 1.23,
"PruneCooldown": 60000000000,
"GasLimitOverestimation": 12.3
}

View File

@ -3882,7 +3882,7 @@ Response:
],
"SizeLimitHigh": 123,
"SizeLimitLow": 123,
"ReplaceByFeeRatio": 12.3,
"ReplaceByFeeRatio": 1.23,
"PruneCooldown": 60000000000,
"GasLimitOverestimation": 12.3
}
@ -4170,7 +4170,7 @@ Inputs:
],
"SizeLimitHigh": 123,
"SizeLimitLow": 123,
"ReplaceByFeeRatio": 12.3,
"ReplaceByFeeRatio": 1.23,
"PruneCooldown": 60000000000,
"GasLimitOverestimation": 12.3
}

View File

@ -191,7 +191,7 @@
[Chainstore]
# type: bool
# env var: LOTUS_CHAINSTORE_ENABLESPLITSTORE
#EnableSplitstore = false
#EnableSplitstore = true
[Chainstore.Splitstore]
# ColdStoreType specifies the type of the coldstore.
@ -199,7 +199,7 @@
#
# type: string
# env var: LOTUS_CHAINSTORE_SPLITSTORE_COLDSTORETYPE
#ColdStoreType = "messages"
#ColdStoreType = "discard"
# HotStoreType specifies the type of the hotstore.
# Only currently supported value is "badger".
@ -230,6 +230,35 @@
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREFULLGCFREQUENCY
#HotStoreFullGCFrequency = 20
# HotStoreMaxSpaceTarget sets a target max disk size for the hotstore. Splitstore GC
# will run moving GC if disk utilization gets within a threshold (150 GB) of the target.
# Splitstore GC will NOT run moving GC if the total size of the move would get
# within 50 GB of the target, and instead will run a more aggressive online GC.
# If both HotStoreFullGCFrequency and HotStoreMaxSpaceTarget are set then splitstore
# GC will trigger moving GC if either configuration condition is met.
# A reasonable minimum is 2x fully GCed hotstore size + 50 G buffer.
# At this minimum size moving GC happens every time, any smaller and moving GC won't
# be able to run. In spring 2023 this minimum is ~550 GB.
#
# type: uint64
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREMAXSPACETARGET
#HotStoreMaxSpaceTarget = 0
# When HotStoreMaxSpaceTarget is set Moving GC will be triggered when total moving size
# exceeds HotstoreMaxSpaceTarget - HotstoreMaxSpaceThreshold
#
# type: uint64
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREMAXSPACETHRESHOLD
#HotStoreMaxSpaceThreshold = 150000000000
# Safety buffer to prevent moving GC from overflowing disk when HotStoreMaxSpaceTarget
# is set. Moving GC will not occur when total moving size exceeds
# HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer
#
# type: uint64
# env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREMAXSPACESAFETYBUFFER
#HotstoreMaxSpaceSafetyBuffer = 50000000000
[Cluster]
# EXPERIMENTAL. config to enabled node cluster with raft consensus

View File

@ -3,6 +3,7 @@ package itests
import (
"context"
"fmt"
"strings"
"testing"
"time"
@ -55,6 +56,10 @@ func TestEthBlockHashesCorrect_MultiBlockTipset(t *testing.T) {
hex := fmt.Sprintf("0x%x", i)
ethBlockA, err := n2.EthGetBlockByNumber(ctx, hex, true)
// Cannot use static ErrFullRound error for comparison since it gets reserialized as a JSON RPC error.
if err != nil && strings.Contains(err.Error(), "null round") {
continue
}
require.NoError(t, err)
ethBlockB, err := n2.EthGetBlockByHash(ctx, ethBlockA.Hash, true)

View File

@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
"github.com/filecoin-project/lotus/itests/kit"
@ -270,6 +271,57 @@ func TestContractInvocation(t *testing.T) {
require.EqualValues(t, ethtypes.EthUint64(0x1), receipt.Status)
}
func TestGetBlockByNumber(t *testing.T) {
blockTime := 100 * time.Millisecond
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC())
bms := ens.InterconnectAll().BeginMining(blockTime)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
// create a new Ethereum account
_, ethAddr, filAddr := client.EVM().NewAccount()
// send some funds to the f410 address
kit.SendFunds(ctx, t, client, filAddr, types.FromFil(10))
latest, err := client.EthBlockNumber(ctx)
require.NoError(t, err)
// can get the latest block
_, err = client.EthGetBlockByNumber(ctx, latest.Hex(), true)
require.NoError(t, err)
// fail to get a future block
_, err = client.EthGetBlockByNumber(ctx, (latest + 10000).Hex(), true)
require.Error(t, err)
// inject 10 null rounds
bms[0].InjectNulls(10)
// wait until we produce blocks again
tctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
ch, err := client.ChainNotify(tctx)
require.NoError(t, err)
<-ch // current
hc := <-ch // wait for next block
require.Equal(t, store.HCApply, hc[0].Type)
afterNullHeight := hc[0].Val.Height()
// Fail when trying to fetch a null round.
_, err = client.EthGetBlockByNumber(ctx, (ethtypes.EthUint64(afterNullHeight - 1)).Hex(), true)
require.Error(t, err)
// Fetch balance on a null round; should not fail and should return previous balance.
// Should be lower than original balance.
bal, err := client.EthGetBalance(ctx, ethAddr, (ethtypes.EthUint64(afterNullHeight - 1)).Hex())
require.NoError(t, err)
require.NotEqual(t, big.Zero(), bal)
require.Equal(t, types.FromFil(10).Int, bal.Int)
}
func deployContractTx(ctx context.Context, client *kit.TestFullNode, ethAddr ethtypes.EthAddress, contract []byte) (*ethtypes.EthTxArgs, error) {
gaslimit, err := client.EthEstimateGas(ctx, ethtypes.EthCall{
From: &ethAddr,

View File

@ -89,13 +89,15 @@ func DefaultFullNode() *FullNode {
SimultaneousTransfersForRetrieval: DefaultSimultaneousTransfers,
},
Chainstore: Chainstore{
EnableSplitstore: false,
EnableSplitstore: true,
Splitstore: Splitstore{
ColdStoreType: "messages",
ColdStoreType: "discard",
HotStoreType: "badger",
MarkSetType: "badger",
HotStoreFullGCFrequency: 20,
HotStoreFullGCFrequency: 20,
HotStoreMaxSpaceThreshold: 150_000_000_000,
HotstoreMaxSpaceSafetyBuffer: 50_000_000_000,
},
},
Cluster: *DefaultUserRaftConfig(),

View File

@ -1286,6 +1286,35 @@ the compaction boundary; default is 0.`,
A value of 0 disables, while a value 1 will do full GC in every compaction.
Default is 20 (about once a week).`,
},
{
Name: "HotStoreMaxSpaceTarget",
Type: "uint64",
Comment: `HotStoreMaxSpaceTarget sets a target max disk size for the hotstore. Splitstore GC
will run moving GC if disk utilization gets within a threshold (150 GB) of the target.
Splitstore GC will NOT run moving GC if the total size of the move would get
within 50 GB of the target, and instead will run a more aggressive online GC.
If both HotStoreFullGCFrequency and HotStoreMaxSpaceTarget are set then splitstore
GC will trigger moving GC if either configuration condition is met.
A reasonable minimum is 2x fully GCed hotstore size + 50 G buffer.
At this minimum size moving GC happens every time, any smaller and moving GC won't
be able to run. In spring 2023 this minimum is ~550 GB.`,
},
{
Name: "HotStoreMaxSpaceThreshold",
Type: "uint64",
Comment: `When HotStoreMaxSpaceTarget is set Moving GC will be triggered when total moving size
exceeds HotstoreMaxSpaceTarget - HotstoreMaxSpaceThreshold`,
},
{
Name: "HotstoreMaxSpaceSafetyBuffer",
Type: "uint64",
Comment: `Safety buffer to prevent moving GC from overflowing disk when HotStoreMaxSpaceTarget
is set. Moving GC will not occur when total moving size exceeds
HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer`,
},
},
"StorageMiner": []DocField{
{

View File

@ -601,6 +601,25 @@ type Splitstore struct {
// A value of 0 disables, while a value 1 will do full GC in every compaction.
// Default is 20 (about once a week).
HotStoreFullGCFrequency uint64
// HotStoreMaxSpaceTarget sets a target max disk size for the hotstore. Splitstore GC
// will run moving GC if disk utilization gets within a threshold (150 GB) of the target.
// Splitstore GC will NOT run moving GC if the total size of the move would get
// within 50 GB of the target, and instead will run a more aggressive online GC.
// If both HotStoreFullGCFrequency and HotStoreMaxSpaceTarget are set then splitstore
// GC will trigger moving GC if either configuration condition is met.
// A reasonable minimum is 2x fully GCed hotstore size + 50 G buffer.
// At this minimum size moving GC happens every time, any smaller and moving GC won't
// be able to run. In spring 2023 this minimum is ~550 GB.
HotStoreMaxSpaceTarget uint64
// When HotStoreMaxSpaceTarget is set Moving GC will be triggered when total moving size
// exceeds HotstoreMaxSpaceTarget - HotstoreMaxSpaceThreshold
HotStoreMaxSpaceThreshold uint64
// Safety buffer to prevent moving GC from overflowing disk when HotStoreMaxSpaceTarget
// is set. Moving GC will not occur when total moving size exceeds
// HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer
HotstoreMaxSpaceSafetyBuffer uint64
}
// // Full Node

View File

@ -153,6 +153,8 @@ type EthAPI struct {
EthEventAPI
}
var ErrNullRound = errors.New("requested epoch was a null round")
func (a *EthModule) StateNetworkName(ctx context.Context) (dtypes.NetworkName, error) {
return stmgr.GetNetworkName(ctx, a.StateManager, a.Chain.GetHeaviestTipSet().ParentState())
}
@ -231,7 +233,7 @@ func (a *EthModule) EthGetBlockByHash(ctx context.Context, blkHash ethtypes.EthH
return newEthBlockFromFilecoinTipSet(ctx, ts, fullTxInfo, a.Chain, a.StateAPI)
}
func (a *EthModule) parseBlkParam(ctx context.Context, blkParam string) (tipset *types.TipSet, err error) {
func (a *EthModule) parseBlkParam(ctx context.Context, blkParam string, strict bool) (tipset *types.TipSet, err error) {
if blkParam == "earliest" {
return nil, fmt.Errorf("block param \"earliest\" is not supported")
}
@ -252,16 +254,22 @@ func (a *EthModule) parseBlkParam(ctx context.Context, blkParam string) (tipset
if err != nil {
return nil, fmt.Errorf("cannot parse block number: %v", err)
}
ts, err := a.Chain.GetTipsetByHeight(ctx, abi.ChainEpoch(num), nil, true)
if abi.ChainEpoch(num) > head.Height()-1 {
return nil, fmt.Errorf("requested a future epoch (beyond 'latest')")
}
ts, err := a.ChainAPI.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(num), head.Key())
if err != nil {
return nil, fmt.Errorf("cannot get tipset at height: %v", num)
}
if strict && ts.Height() != abi.ChainEpoch(num) {
return nil, ErrNullRound
}
return ts, nil
}
}
func (a *EthModule) EthGetBlockByNumber(ctx context.Context, blkParam string, fullTxInfo bool) (ethtypes.EthBlock, error) {
ts, err := a.parseBlkParam(ctx, blkParam)
ts, err := a.parseBlkParam(ctx, blkParam, true)
if err != nil {
return ethtypes.EthBlock{}, err
}
@ -367,7 +375,7 @@ func (a *EthModule) EthGetTransactionCount(ctx context.Context, sender ethtypes.
return ethtypes.EthUint64(0), nil
}
ts, err := a.parseBlkParam(ctx, blkParam)
ts, err := a.parseBlkParam(ctx, blkParam, false)
if err != nil {
return ethtypes.EthUint64(0), xerrors.Errorf("cannot parse block param: %s", blkParam)
}
@ -433,7 +441,7 @@ func (a *EthModule) EthGetTransactionReceipt(ctx context.Context, txHash ethtype
}
}
receipt, err := newEthTxReceipt(ctx, tx, msgLookup, replay, events, a.StateAPI)
receipt, err := newEthTxReceipt(ctx, tx, replay, events, a.StateAPI)
if err != nil {
return nil, nil
}
@ -456,7 +464,7 @@ func (a *EthModule) EthGetCode(ctx context.Context, ethAddr ethtypes.EthAddress,
return nil, xerrors.Errorf("cannot get Filecoin address: %w", err)
}
ts, err := a.parseBlkParam(ctx, blkParam)
ts, err := a.parseBlkParam(ctx, blkParam, false)
if err != nil {
return nil, xerrors.Errorf("cannot parse block param: %s", blkParam)
}
@ -535,7 +543,7 @@ func (a *EthModule) EthGetCode(ctx context.Context, ethAddr ethtypes.EthAddress,
}
func (a *EthModule) EthGetStorageAt(ctx context.Context, ethAddr ethtypes.EthAddress, position ethtypes.EthBytes, blkParam string) (ethtypes.EthBytes, error) {
ts, err := a.parseBlkParam(ctx, blkParam)
ts, err := a.parseBlkParam(ctx, blkParam, false)
if err != nil {
return nil, xerrors.Errorf("cannot parse block param: %s", blkParam)
}
@ -631,7 +639,7 @@ func (a *EthModule) EthGetBalance(ctx context.Context, address ethtypes.EthAddre
return ethtypes.EthBigInt{}, err
}
ts, err := a.parseBlkParam(ctx, blkParam)
ts, err := a.parseBlkParam(ctx, blkParam, false)
if err != nil {
return ethtypes.EthBigInt{}, xerrors.Errorf("cannot parse block param: %s", blkParam)
}
@ -676,7 +684,7 @@ func (a *EthModule) EthFeeHistory(ctx context.Context, p jsonrpc.RawParams) (eth
}
}
ts, err := a.parseBlkParam(ctx, params.NewestBlkNum)
ts, err := a.parseBlkParam(ctx, params.NewestBlkNum, false)
if err != nil {
return ethtypes.EthFeeHistory{}, fmt.Errorf("bad block parameter %s: %s", params.NewestBlkNum, err)
}
@ -1067,7 +1075,7 @@ func (a *EthModule) EthCall(ctx context.Context, tx ethtypes.EthCall, blkParam s
return nil, xerrors.Errorf("failed to convert ethcall to filecoin message: %w", err)
}
ts, err := a.parseBlkParam(ctx, blkParam)
ts, err := a.parseBlkParam(ctx, blkParam, false)
if err != nil {
return nil, xerrors.Errorf("cannot parse block param: %s", blkParam)
}
@ -1797,12 +1805,16 @@ func newEthBlockFromFilecoinTipSet(ctx context.Context, ts *types.TipSet, fullTx
return ethtypes.EthBlock{}, xerrors.Errorf("failed to compute state: %w", err)
}
for txIdx, msg := range compOutput.Trace {
txIdx := 0
for _, msg := range compOutput.Trace {
// skip system messages like reward application and cron
if msg.Msg.From == builtintypes.SystemActorAddr {
continue
}
ti := ethtypes.EthUint64(txIdx)
txIdx++
gasUsed += msg.MsgRct.GasUsed
smsgCid, err := getSignedMessage(ctx, cs, msg.MsgCid)
if err != nil {
@ -1813,8 +1825,6 @@ func newEthBlockFromFilecoinTipSet(ctx context.Context, ts *types.TipSet, fullTx
return ethtypes.EthBlock{}, xerrors.Errorf("failed to convert msg to ethTx: %w", err)
}
ti := ethtypes.EthUint64(txIdx)
tx.ChainID = ethtypes.EthUint64(build.Eip155ChainId)
tx.BlockHash = &blkHash
tx.BlockNumber = &bn
@ -2031,7 +2041,7 @@ func newEthTxFromMessageLookup(ctx context.Context, msgLookup *api.MsgLookup, tx
return tx, nil
}
func newEthTxReceipt(ctx context.Context, tx ethtypes.EthTx, lookup *api.MsgLookup, replay *api.InvocResult, events []types.Event, sa StateAPI) (api.EthTxReceipt, error) {
func newEthTxReceipt(ctx context.Context, tx ethtypes.EthTx, replay *api.InvocResult, events []types.Event, sa StateAPI) (api.EthTxReceipt, error) {
var (
transactionIndex ethtypes.EthUint64
blockHash ethtypes.EthHash
@ -2060,25 +2070,25 @@ func newEthTxReceipt(ctx context.Context, tx ethtypes.EthTx, lookup *api.MsgLook
LogsBloom: ethtypes.EmptyEthBloom[:],
}
if lookup.Receipt.ExitCode.IsSuccess() {
if replay.MsgRct.ExitCode.IsSuccess() {
receipt.Status = 1
}
if lookup.Receipt.ExitCode.IsError() {
if replay.MsgRct.ExitCode.IsError() {
receipt.Status = 0
}
receipt.GasUsed = ethtypes.EthUint64(lookup.Receipt.GasUsed)
receipt.GasUsed = ethtypes.EthUint64(replay.MsgRct.GasUsed)
// TODO: handle CumulativeGasUsed
receipt.CumulativeGasUsed = ethtypes.EmptyEthInt
effectiveGasPrice := big.Div(replay.GasCost.TotalCost, big.NewInt(lookup.Receipt.GasUsed))
effectiveGasPrice := big.Div(replay.GasCost.TotalCost, big.NewInt(replay.MsgRct.GasUsed))
receipt.EffectiveGasPrice = ethtypes.EthBigInt(effectiveGasPrice)
if receipt.To == nil && lookup.Receipt.ExitCode.IsSuccess() {
if receipt.To == nil && replay.MsgRct.ExitCode.IsSuccess() {
// Create and Create2 return the same things.
var ret eam.CreateExternalReturn
if err := ret.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil {
if err := ret.UnmarshalCBOR(bytes.NewReader(replay.MsgRct.Return)); err != nil {
return api.EthTxReceipt{}, xerrors.Errorf("failed to parse contract creation result: %w", err)
}
addr := ethtypes.EthAddress(ret.EthAddress)

View File

@ -82,11 +82,14 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
}
cfg := &splitstore.Config{
MarkSetType: cfg.Splitstore.MarkSetType,
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal",
HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention,
HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency,
MarkSetType: cfg.Splitstore.MarkSetType,
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal",
HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention,
HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency,
HotstoreMaxSpaceTarget: cfg.Splitstore.HotStoreMaxSpaceTarget,
HotstoreMaxSpaceThreshold: cfg.Splitstore.HotStoreMaxSpaceThreshold,
HotstoreMaxSpaceSafetyBuffer: cfg.Splitstore.HotstoreMaxSpaceSafetyBuffer,
}
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
if err != nil {

View File

@ -123,7 +123,7 @@ func NetworkName(mctx helpers.MetricsCtx,
ctx := helpers.LifecycleCtx(mctx, lc)
sm, err := stmgr.NewStateManager(cs, tsexec, syscalls, us, nil)
sm, err := stmgr.NewStateManager(cs, tsexec, syscalls, us, nil, nil)
if err != nil {
return "", err
}

View File

@ -7,10 +7,11 @@ import (
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
func StateManager(lc fx.Lifecycle, cs *store.ChainStore, exec stmgr.Executor, sys vm.SyscallBuilder, us stmgr.UpgradeSchedule, b beacon.Schedule) (*stmgr.StateManager, error) {
sm, err := stmgr.NewStateManager(cs, exec, sys, us, b)
func StateManager(lc fx.Lifecycle, cs *store.ChainStore, exec stmgr.Executor, sys vm.SyscallBuilder, us stmgr.UpgradeSchedule, b beacon.Schedule, metadataDs dtypes.MetadataDS) (*stmgr.StateManager, error) {
sm, err := stmgr.NewStateManager(cs, exec, sys, us, b, metadataDs)
if err != nil {
return nil, err
}