Merge remote-tracking branch 'origin/master' into feat/split-net-api

This commit is contained in:
Łukasz Magiera 2021-07-22 15:38:06 +02:00
commit 660829703a
37 changed files with 612 additions and 113 deletions

6
.github/CODEOWNERS vendored Normal file
View File

@ -0,0 +1,6 @@
# Reference
# https://docs.github.com/en/github/creating-cloning-and-archiving-repositories/creating-a-repository-on-github/about-code-owners
# Global owners
# Ensure maintainers team is a requested reviewer for non-draft PRs
* @filecoin-project/lotus-maintainers

View File

@ -1,5 +1,38 @@
# Lotus changelog
# 1.10.1 / 2021-07-05
This is an optional but **highly recommended** release of Lotus for lotus miners that has many bug fixes and improvements based on the feedback we got from the community since HyperDrive.
## New Features
- commit batch: AggregateAboveBaseFee config #6650
- `AggregateAboveBaseFee` is added to miner sealing configuration for setting the network base fee to start aggregating proofs. When the network base fee is lower than this value, the prove commits will be submitted individually via `ProveCommitSector`. According to the [Batch Incentive Alignment](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0013.md#batch-incentive-alignment) introduced in FIP-0013, we recommend miners to set this value to 0.15 nanoFIL(which is the default value) to avoid unexpected aggregation fee in burn and enjoy the most benefits of aggregation!
## Bug Fixes
- storage: Fix FinalizeSector with sectors in storage paths #6652
- Fix tiny error in check-client-datacap #6664
- Fix: precommit_batch method used the wrong cfg.PreCommitBatchWait #6658
- to optimize the batchwait #6636
- fix getTicket: sector precommitted but expired case #6635
- handleSubmitCommitAggregate() exception handling #6595
- remove precommit check in handleCommitFailed #6634
- ensure agg fee is adequate
- fix: miner balance is not enough, so that ProveCommitAggregate msg exec failed #6623
- commit batch: Initialize the FailedSectors map #6647
Contributors
| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| @magik6k| 7 | +151/-56 | 21 |
| @llifezou | 4 | +59/-20 | 4 |
| @johnli-helloworld | 2 | +45/-14 | 4 |
| @wangchao | 1 | +1/-27 | 1 |
| Jerry | 2 | +9/-4 | 2 |
| @zhoutian527 | 1 | +2/-2 | 1 |
| @ribasushi| 1 | +1/-1 | 1 |
# 1.10.0 / 2021-06-23
This is a mandatory release of Lotus that introduces Filecoin network v13, codenamed the HyperDrive upgrade. The

View File

@ -45,6 +45,7 @@ type Gateway interface {
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (DealCollateralBounds, error)
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*ActorState, error) //perm:read
StateListMiners(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error)
StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
StateMarketBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (MarketBalance, error)

View File

@ -510,6 +510,8 @@ type GatewayStruct struct {
StateNetworkVersion func(p0 context.Context, p1 types.TipSetKey) (apitypes.NetworkVersion, error) ``
StateReadState func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*ActorState, error) `perm:"read"`
StateSearchMsg func(p0 context.Context, p1 types.TipSetKey, p2 cid.Cid, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) ``
StateSectorGetInfo func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (*miner.SectorOnChainInfo, error) ``
@ -2557,6 +2559,14 @@ func (s *GatewayStub) StateNetworkVersion(p0 context.Context, p1 types.TipSetKey
return *new(apitypes.NetworkVersion), xerrors.New("method not supported")
}
func (s *GatewayStruct) StateReadState(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*ActorState, error) {
return s.Internal.StateReadState(p0, p1, p2)
}
func (s *GatewayStub) StateReadState(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*ActorState, error) {
return nil, xerrors.New("method not supported")
}
func (s *GatewayStruct) StateSearchMsg(p0 context.Context, p1 types.TipSetKey, p2 cid.Cid, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) {
return s.Internal.StateSearchMsg(p0, p1, p2, p3, p4)
}

View File

@ -71,6 +71,13 @@ type Config struct {
// which skips moving (as it is a noop, but still takes time to read all the cold objects)
// and directly purges cold blocks.
DiscardColdBlocks bool
// HotstoreMessageRetention indicates the hotstore retention policy for messages.
// It has the following semantics:
// - a value of 0 will only retain messages within the compaction boundary (4 finalities)
// - a positive integer indicates the number of finalities, outside the compaction boundary,
// for which messages will be retained in the hotstore.
HotStoreMessageRetention uint64
}
// ChainAccessor allows the Splitstore to access the chain. It will most likely
@ -128,6 +135,9 @@ type SplitStore struct {
txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{}
txnMissing map[cid.Cid]struct{}
// registered protectors
protectors []func(func(cid.Cid) error) error
}
var _ bstore.Blockstore = (*SplitStore)(nil)
@ -520,6 +530,13 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
return nil
}
func (s *SplitStore) AddProtector(protector func(func(cid.Cid) error) error) {
s.mx.Lock()
defer s.mx.Unlock()
s.protectors = append(s.protectors, protector)
}
func (s *SplitStore) Close() error {
if !atomic.CompareAndSwapInt32(&s.closing, 0, 1) {
// already closing

View File

@ -345,6 +345,30 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
})
}
func (s *SplitStore) applyProtectors() error {
s.mx.Lock()
defer s.mx.Unlock()
count := 0
for _, protect := range s.protectors {
err := protect(func(c cid.Cid) error {
s.trackTxnRef(c)
count++
return nil
})
if err != nil {
return xerrors.Errorf("error applynig protector: %w", err)
}
}
if count > 0 {
log.Infof("protected %d references through %d protectors", count, len(s.protectors))
}
return nil
}
// --- Compaction ---
// Compaction works transactionally with the following algorithm:
// - We prepare a transaction, whereby all i/o referenced objects through the API are tracked.
@ -376,7 +400,13 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
currentEpoch := curTs.Height()
boundaryEpoch := currentEpoch - CompactionBoundary
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "compactionIndex", s.compactionIndex)
var inclMsgsEpoch abi.ChainEpoch
inclMsgsRange := abi.ChainEpoch(s.cfg.HotStoreMessageRetention) * build.Finality
if inclMsgsRange < boundaryEpoch {
inclMsgsEpoch = boundaryEpoch - inclMsgsRange
}
log.Infow("running compaction", "currentEpoch", currentEpoch, "baseEpoch", s.baseEpoch, "boundaryEpoch", boundaryEpoch, "inclMsgsEpoch", inclMsgsEpoch, "compactionIndex", s.compactionIndex)
markSet, err := s.markSetEnv.Create("live", s.markSetSize)
if err != nil {
@ -392,13 +422,21 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
// we are ready for concurrent marking
s.beginTxnMarking(markSet)
// 0. track all protected references at beginning of compaction; anything added later should
// be transactionally protected by the write
log.Info("protecting references with registered protectors")
err = s.applyProtectors()
if err != nil {
return err
}
// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
// and messages until the boundary epoch.
log.Info("marking reachable objects")
startMark := time.Now()
var count int64
err = s.walkChain(curTs, boundaryEpoch, true,
err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch,
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk
@ -593,7 +631,7 @@ func (s *SplitStore) endTxnProtect() {
s.txnMissing = nil
}
func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool,
func (s *SplitStore) walkChain(ts *types.TipSet, inclState abi.ChainEpoch, inclMsgs abi.ChainEpoch,
f func(cid.Cid) error) error {
visited := cid.NewSet()
walked := cid.NewSet()
@ -621,14 +659,25 @@ func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMs
return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err)
}
// we only scan the block if it is at or above the boundary
if hdr.Height >= boundary || hdr.Height == 0 {
scanCnt++
if inclMsgs && hdr.Height > 0 {
// message are retained if within the inclMsgs boundary
if hdr.Height >= inclMsgs && hdr.Height > 0 {
if inclMsgs < inclState {
// we need to use walkObjectIncomplete here, as messages may be missing early on if we
// synced from snapshot and have a long HotStoreMessageRetentionPolicy.
stopWalk := func(_ cid.Cid) error { return errStopWalk }
if err := s.walkObjectIncomplete(hdr.Messages, walked, f, stopWalk); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
} else {
if err := s.walkObject(hdr.Messages, walked, f); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
}
}
// state and message receipts is only retained if within the inclState boundary
if hdr.Height >= inclState || hdr.Height == 0 {
if hdr.Height > 0 {
if err := s.walkObject(hdr.ParentMessageReceipts, walked, f); err != nil {
return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err)
}
@ -637,6 +686,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMs
if err := s.walkObject(hdr.ParentStateRoot, walked, f); err != nil {
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
}
scanCnt++
}
if hdr.Height > 0 {

View File

@ -0,0 +1,114 @@
package splitstore
import (
"context"
"errors"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
bstore "github.com/filecoin-project/lotus/blockstore"
)
type exposedSplitStore struct {
s *SplitStore
}
var _ bstore.Blockstore = (*exposedSplitStore)(nil)
func (s *SplitStore) Expose() bstore.Blockstore {
return &exposedSplitStore{s: s}
}
func (es *exposedSplitStore) DeleteBlock(_ cid.Cid) error {
return errors.New("DeleteBlock: operation not supported")
}
func (es *exposedSplitStore) DeleteMany(_ []cid.Cid) error {
return errors.New("DeleteMany: operation not supported")
}
func (es *exposedSplitStore) Has(c cid.Cid) (bool, error) {
if isIdentiyCid(c) {
return true, nil
}
has, err := es.s.hot.Has(c)
if has || err != nil {
return has, err
}
return es.s.cold.Has(c)
}
func (es *exposedSplitStore) Get(c cid.Cid) (blocks.Block, error) {
if isIdentiyCid(c) {
data, err := decodeIdentityCid(c)
if err != nil {
return nil, err
}
return blocks.NewBlockWithCid(data, c)
}
blk, err := es.s.hot.Get(c)
switch err {
case bstore.ErrNotFound:
return es.s.cold.Get(c)
default:
return blk, err
}
}
func (es *exposedSplitStore) GetSize(c cid.Cid) (int, error) {
if isIdentiyCid(c) {
data, err := decodeIdentityCid(c)
if err != nil {
return 0, err
}
return len(data), nil
}
size, err := es.s.hot.GetSize(c)
switch err {
case bstore.ErrNotFound:
return es.s.cold.GetSize(c)
default:
return size, err
}
}
func (es *exposedSplitStore) Put(blk blocks.Block) error {
return es.s.Put(blk)
}
func (es *exposedSplitStore) PutMany(blks []blocks.Block) error {
return es.s.PutMany(blks)
}
func (es *exposedSplitStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return es.s.AllKeysChan(ctx)
}
func (es *exposedSplitStore) HashOnRead(enabled bool) {}
func (es *exposedSplitStore) View(c cid.Cid, f func([]byte) error) error {
if isIdentiyCid(c) {
data, err := decodeIdentityCid(c)
if err != nil {
return err
}
return f(data)
}
err := es.s.hot.View(c, f)
switch err {
case bstore.ErrNotFound:
return es.s.cold.View(c, f)
default:
return err
}
}

View File

@ -63,6 +63,20 @@ func testSplitStore(t *testing.T, cfg *Config) {
t.Fatal(err)
}
// create a garbage block that is protected with a rgistered protector
protected := blocks.NewBlock([]byte("protected!"))
err = hot.Put(protected)
if err != nil {
t.Fatal(err)
}
// and another one that is not protected
unprotected := blocks.NewBlock([]byte("unprotected!"))
err = hot.Put(unprotected)
if err != nil {
t.Fatal(err)
}
// open the splitstore
ss, err := Open("", ds, hot, cold, cfg)
if err != nil {
@ -70,6 +84,11 @@ func testSplitStore(t *testing.T, cfg *Config) {
}
defer ss.Close() //nolint
// register our protector
ss.AddProtector(func(protect func(cid.Cid) error) error {
return protect(protected.Cid())
})
err = ss.Start(chain)
if err != nil {
t.Fatal(err)
@ -132,8 +151,8 @@ func testSplitStore(t *testing.T, cfg *Config) {
t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
}
if hotCnt != 10 {
t.Errorf("expected %d blocks, but got %d", 10, hotCnt)
if hotCnt != 12 {
t.Errorf("expected %d blocks, but got %d", 12, hotCnt)
}
// trigger a compaction
@ -146,12 +165,41 @@ func testSplitStore(t *testing.T, cfg *Config) {
coldCnt = countBlocks(cold)
hotCnt = countBlocks(hot)
if coldCnt != 5 {
t.Errorf("expected %d cold blocks, but got %d", 5, coldCnt)
if coldCnt != 6 {
t.Errorf("expected %d cold blocks, but got %d", 6, coldCnt)
}
if hotCnt != 17 {
t.Errorf("expected %d hot blocks, but got %d", 17, hotCnt)
if hotCnt != 18 {
t.Errorf("expected %d hot blocks, but got %d", 18, hotCnt)
}
// ensure our protected block is still there
has, err := hot.Has(protected.Cid())
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("protected block is missing from hotstore")
}
// ensure our unprotected block is in the coldstore now
has, err = hot.Has(unprotected.Cid())
if err != nil {
t.Fatal(err)
}
if has {
t.Fatal("unprotected block is still in hotstore")
}
has, err = cold.Has(unprotected.Cid())
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("unprotected block is missing from coldstore")
}
// Make sure we can revert without panicking.

View File

@ -48,7 +48,7 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
count := int64(0)
xcount := int64(0)
missing := int64(0)
err := s.walkChain(curTs, epoch, false,
err := s.walkChain(curTs, epoch, epoch+1, // we don't load messages in warmup
func(c cid.Cid) error {
if isUnitaryObject(c) {
return errStopWalk

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -5,6 +5,9 @@ import (
"math"
"sync"
"github.com/filecoin-project/lotus/api"
lru "github.com/hashicorp/golang-lru"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/go-state-types/abi"
@ -464,14 +467,20 @@ type messageEvents struct {
lk sync.RWMutex
matchers map[triggerID]MsgMatchFunc
blockMsgLk sync.Mutex
blockMsgCache *lru.ARCCache
}
func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs EventAPI) messageEvents {
blsMsgCache, _ := lru.NewARC(500)
return messageEvents{
ctx: ctx,
cs: cs,
hcAPI: hcAPI,
matchers: make(map[triggerID]MsgMatchFunc),
ctx: ctx,
cs: cs,
hcAPI: hcAPI,
matchers: make(map[triggerID]MsgMatchFunc),
blockMsgLk: sync.Mutex{},
blockMsgCache: blsMsgCache,
}
}
@ -515,14 +524,21 @@ func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Mes
seen := map[cid.Cid]struct{}{}
for _, tsb := range ts.Blocks() {
msgs, err := me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid())
if err != nil {
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
// this is quite bad, but probably better than missing all the other updates
continue
me.blockMsgLk.Lock()
msgsI, ok := me.blockMsgCache.Get(tsb.Cid())
var err error
if !ok {
msgsI, err = me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid())
if err != nil {
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
// this is quite bad, but probably better than missing all the other updates
me.blockMsgLk.Unlock()
continue
}
me.blockMsgCache.Add(tsb.Cid(), msgsI)
}
me.blockMsgLk.Unlock()
msgs := msgsI.(*api.BlockMessages)
for _, m := range msgs.BlsMessages {
_, ok := seen[m.Cid()]
if ok {

View File

@ -6,6 +6,8 @@ import (
"sync"
"testing"
"gotest.tools/assert"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
@ -44,25 +46,43 @@ type fakeCS struct {
tipsets map[types.TipSetKey]*types.TipSet
sub func(rev, app []*types.TipSet)
callNumberLk sync.Mutex
callNumber map[string]int
}
func (fcs *fakeCS) ChainHead(ctx context.Context) (*types.TipSet, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainHead"] = fcs.callNumber["ChainHead"] + 1
panic("implement me")
}
func (fcs *fakeCS) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainGetTipSet"] = fcs.callNumber["ChainGetTipSet"] + 1
return fcs.tipsets[key], nil
}
func (fcs *fakeCS) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["StateSearchMsg"] = fcs.callNumber["StateSearchMsg"] + 1
return nil, nil
}
func (fcs *fakeCS) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["StateGetActor"] = fcs.callNumber["StateGetActor"] + 1
panic("Not Implemented")
}
func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainGetTipSetByHeight"] = fcs.callNumber["ChainGetTipSetByHeight"] + 1
panic("Not Implemented")
}
@ -113,6 +133,10 @@ func (fcs *fakeCS) makeTs(t *testing.T, parents []cid.Cid, h abi.ChainEpoch, msg
}
func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*api.HeadChange, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainNotify"] = fcs.callNumber["ChainNotify"] + 1
out := make(chan []*api.HeadChange, 1)
best, err := fcs.tsc.best()
if err != nil {
@ -143,6 +167,9 @@ func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*api.HeadChange, error
}
func (fcs *fakeCS) ChainGetBlockMessages(ctx context.Context, blk cid.Cid) (*api.BlockMessages, error) {
fcs.callNumberLk.Lock()
defer fcs.callNumberLk.Unlock()
fcs.callNumber["ChainGetBlockMessages"] = fcs.callNumber["ChainGetBlockMessages"] + 1
messages, ok := fcs.blkMsgs[blk]
if !ok {
return &api.BlockMessages{}, nil
@ -152,8 +179,8 @@ func (fcs *fakeCS) ChainGetBlockMessages(ctx context.Context, blk cid.Cid) (*api
if !ok {
return &api.BlockMessages{}, nil
}
return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs}, nil
return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs}, nil
}
func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid {
@ -233,9 +260,10 @@ var _ EventAPI = &fakeCS{}
func TestAt(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -298,9 +326,10 @@ func TestAt(t *testing.T) {
func TestAtDoubleTrigger(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -340,9 +369,10 @@ func TestAtDoubleTrigger(t *testing.T) {
func TestAtNullTrigger(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -374,9 +404,10 @@ func TestAtNullTrigger(t *testing.T) {
func TestAtNullConf(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -413,9 +444,10 @@ func TestAtNullConf(t *testing.T) {
func TestAtStart(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -447,9 +479,10 @@ func TestAtStart(t *testing.T) {
func TestAtStartConfidence(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -477,9 +510,10 @@ func TestAtStartConfidence(t *testing.T) {
func TestAtChained(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -511,9 +545,10 @@ func TestAtChained(t *testing.T) {
func TestAtChainedConfidence(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -545,9 +580,10 @@ func TestAtChainedConfidence(t *testing.T) {
func TestAtChainedConfidenceNull(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -583,9 +619,10 @@ func TestCalled(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -795,9 +832,10 @@ func TestCalledTimeout(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -835,9 +873,10 @@ func TestCalledTimeout(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -869,9 +908,10 @@ func TestCalledOrder(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -932,9 +972,10 @@ func TestCalledNull(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -997,9 +1038,10 @@ func TestRemoveTriggersOnMessage(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -1087,9 +1129,10 @@ func TestStateChanged(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -1175,9 +1218,10 @@ func TestStateChangedRevert(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -1253,9 +1297,10 @@ func TestStateChangedTimeout(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
callNumber: map[string]int{},
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -1293,9 +1338,10 @@ func TestStateChangedTimeout(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -1329,9 +1375,10 @@ func TestCalledMultiplePerEpoch(t *testing.T) {
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
@ -1382,3 +1429,24 @@ func TestCalledMultiplePerEpoch(t *testing.T) {
fcs.advance(9, 1, nil)
}
func TestCachedSameBlock(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
callNumber: map[string]int{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
_ = NewEvents(context.Background(), fcs)
fcs.advance(0, 10, map[int]cid.Cid{})
assert.Assert(t, fcs.callNumber["ChainGetBlockMessages"] == 20, "expect call ChainGetBlockMessages %d but got ", 20, fcs.callNumber["ChainGetBlockMessages"])
fcs.advance(5, 10, map[int]cid.Cid{})
assert.Assert(t, fcs.callNumber["ChainGetBlockMessages"] == 30, "expect call ChainGetBlockMessages %d but got ", 30, fcs.callNumber["ChainGetBlockMessages"])
}

View File

@ -426,6 +426,27 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
return mp, nil
}
func (mp *MessagePool) ForEachPendingMessage(f func(cid.Cid) error) error {
mp.lk.Lock()
defer mp.lk.Unlock()
for _, mset := range mp.pending {
for _, m := range mset.msgs {
err := f(m.Cid())
if err != nil {
return err
}
err = f(m.Message.Cid())
if err != nil {
return err
}
}
}
return nil
}
func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
// check the cache
a, f := mp.keyCache[addr]

View File

@ -105,6 +105,7 @@ func (tma *testMpoolAPI) SubscribeHeadChanges(cb func(rev, app []*types.TipSet)
func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) {
return cid.Undef, nil
}
func (tma *testMpoolAPI) IsLite() bool {
return false
}

View File

@ -446,6 +446,9 @@ var StateExecTraceCmd = &cli.Command{
if err != nil {
return err
}
if lookup == nil {
return fmt.Errorf("failed to find message: %s", mcid)
}
ts, err := capi.ChainGetTipSet(ctx, lookup.TipSet)
if err != nil {
@ -1491,6 +1494,10 @@ var StateSearchMsgCmd = &cli.Command{
return err
}
if mw == nil {
return fmt.Errorf("failed to find message: %s", msg)
}
m, err := api.ChainGetMessage(ctx, msg)
if err != nil {
return err

View File

@ -337,6 +337,9 @@ func resolveFromChain(ctx context.Context, api v0api.FullNode, mcid cid.Cid, blo
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to locate message: %w", err)
}
if msgInfo == nil {
return nil, nil, nil, fmt.Errorf("failed to locate message: not found")
}
log.Printf("located message at tipset %s (height: %d) with exit code: %s", msgInfo.TipSet, msgInfo.Height, msgInfo.Receipt.ExitCode)

View File

@ -69,6 +69,10 @@ func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: search msg failed: %w", publishCid, err)
}
if lookup == nil {
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: not found", publishCid)
}
if lookup.Receipt.ExitCode != exitcode.Ok {
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", publishCid, lookup.Receipt.ExitCode)
}

View File

@ -25,7 +25,7 @@ import (
"github.com/stretchr/testify/require"
)
var errNotFound = errors.New("Could not find")
var errNotFound = errors.New("could not find")
func TestGetCurrentDealInfo(t *testing.T) {
ctx := context.Background()
@ -180,6 +180,12 @@ func TestGetCurrentDealInfo(t *testing.T) {
expectedDealID: zeroDealID,
expectedError: xerrors.Errorf("looking for publish deal message %s: search msg failed: something went wrong", dummyCid),
},
"search message not found": {
publishCid: dummyCid,
targetProposal: &proposal,
expectedDealID: zeroDealID,
expectedError: xerrors.Errorf("looking for publish deal message %s: not found", dummyCid),
},
"return code not ok": {
publishCid: dummyCid,
searchMessageLookup: &MsgLookup{

View File

@ -358,8 +358,11 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
}
params, pcd, tok, err := m.preCommitParams(ctx, sector)
if params == nil || err != nil {
return err
if err != nil {
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("preCommitParams: %w", err)})
}
if params == nil {
return nil // event was sent in preCommitParams
}
deposit, err := collateralSendAmount(ctx.Context(), m.api, m.maddr, cfg, pcd)
@ -403,9 +406,12 @@ func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector Se
}
params, deposit, _, err := m.preCommitParams(ctx, sector)
if params == nil || err != nil {
if err != nil {
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("preCommitParams: %w", err)})
}
if params == nil {
return nil // event was sent in preCommitParams
}
res, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params)
if err != nil {

2
go.mod
View File

@ -110,7 +110,7 @@ require (
github.com/libp2p/go-libp2p-mplex v0.4.1
github.com/libp2p/go-libp2p-noise v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.2.7
github.com/libp2p/go-libp2p-pubsub v0.4.2-0.20210212194758-6c1addf493eb
github.com/libp2p/go-libp2p-pubsub v0.5.0
github.com/libp2p/go-libp2p-quic-transport v0.10.0
github.com/libp2p/go-libp2p-record v0.1.3
github.com/libp2p/go-libp2p-routing-helpers v0.2.3

6
go.sum
View File

@ -934,6 +934,7 @@ github.com/libp2p/go-libp2p-core v0.7.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.3/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.5 h1:aEgbIcPGsKy6zYcC+5AJivYFedhYa4sW7mIpWpUaLKw=
github.com/libp2p/go-libp2p-core v0.8.5/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE=
@ -1010,8 +1011,8 @@ github.com/libp2p/go-libp2p-protocol v0.0.1/go.mod h1:Af9n4PiruirSDjHycM1QuiMi/1
github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk=
github.com/libp2p/go-libp2p-pubsub v0.1.1/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q=
github.com/libp2p/go-libp2p-pubsub v0.3.2-0.20200527132641-c0712c6e92cf/go.mod h1:TxPOBuo1FPdsTjFnv+FGZbNbWYsp74Culx+4ViQpato=
github.com/libp2p/go-libp2p-pubsub v0.4.2-0.20210212194758-6c1addf493eb h1:HExLcdXn8fgtXPciUw97O5NNhBn31dt6d9fVUD4cngo=
github.com/libp2p/go-libp2p-pubsub v0.4.2-0.20210212194758-6c1addf493eb/go.mod h1:izkeMLvz6Ht8yAISXjx60XUQZMq9ZMe5h2ih4dLIBIQ=
github.com/libp2p/go-libp2p-pubsub v0.5.0 h1:OzcIuCWyJpOrWH0PTOfvxTzqFur4tiXpY5jXC8OxjyE=
github.com/libp2p/go-libp2p-pubsub v0.5.0/go.mod h1:MKnrsQkFgPcrQs1KVmOXy6Uz2RDQ1xO7dQo/P0Ba+ig=
github.com/libp2p/go-libp2p-quic-transport v0.1.1/go.mod h1:wqG/jzhF3Pu2NrhJEvE+IE0NTHNXslOPn9JQzyCAxzU=
github.com/libp2p/go-libp2p-quic-transport v0.5.0/go.mod h1:IEcuC5MLxvZ5KuHKjRu+dr3LjCT1Be3rcD/4d8JrX8M=
github.com/libp2p/go-libp2p-quic-transport v0.10.0 h1:koDCbWD9CCHwcHZL3/WEvP2A+e/o5/W5L3QS/2SPMA0=
@ -1043,6 +1044,7 @@ github.com/libp2p/go-libp2p-swarm v0.2.7/go.mod h1:ZSJ0Q+oq/B1JgfPHJAT2HTall+xYR
github.com/libp2p/go-libp2p-swarm v0.2.8/go.mod h1:JQKMGSth4SMqonruY0a8yjlPVIkb0mdNSwckW7OYziM=
github.com/libp2p/go-libp2p-swarm v0.3.0/go.mod h1:hdv95GWCTmzkgeJpP+GK/9D9puJegb7H57B5hWQR5Kk=
github.com/libp2p/go-libp2p-swarm v0.3.1/go.mod h1:hdv95GWCTmzkgeJpP+GK/9D9puJegb7H57B5hWQR5Kk=
github.com/libp2p/go-libp2p-swarm v0.4.3/go.mod h1:mmxP1pGBSc1Arw4F5DIjcpjFAmsRzA1KADuMtMuCT4g=
github.com/libp2p/go-libp2p-swarm v0.5.0 h1:HIK0z3Eqoo8ugmN8YqWAhD2RORgR+3iNXYG4U2PFd1E=
github.com/libp2p/go-libp2p-swarm v0.5.0/go.mod h1:sU9i6BoHE0Ve5SKz3y9WfKrh8dUat6JknzUehFx8xW4=
github.com/libp2p/go-libp2p-testing v0.0.1/go.mod h1:gvchhf3FQOtBdr+eFUABet5a4MBLK8jM3V4Zghvmi+E=

View File

@ -121,6 +121,7 @@ func (ts *apiSuite) testSearchMsg(t *testing.T) {
searchRes, err := full.StateSearchMsg(ctx, types.EmptyTSK, sm.Cid(), lapi.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, searchRes)
require.Equalf(t, res.TipSet, searchRes.TipSet, "search ts: %s, different from wait ts: %s", searchRes.TipSet, res.TipSet)
}

View File

@ -213,12 +213,18 @@ func TestWindowPostBaseFeeNoBurn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sched := kit.DefaultTestUpgradeSchedule
lastUpgradeHeight := sched[len(sched)-1].Height
och := build.UpgradeClausHeight
build.UpgradeClausHeight = 10
build.UpgradeClausHeight = lastUpgradeHeight + 1
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs())
ens.InterconnectAll().BeginMining(blocktime)
// Wait till all upgrades are done and we've passed the clause epoch.
client.WaitTillChain(ctx, kit.HeightAtLeast(build.UpgradeClausHeight+1))
maddr, err := miner.ActorAddress(ctx)
require.NoError(t, err)
@ -268,6 +274,12 @@ func TestWindowPostBaseFeeBurn(t *testing.T) {
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), opts)
ens.InterconnectAll().BeginMining(blocktime)
// Ideally we'd be a bit more precise here, but getting the information we need from the
// test framework is more work than it's worth.
//
// We just need to wait till all upgrades are done.
client.WaitTillChain(ctx, kit.HeightAtLeast(20))
maddr, err := miner.ActorAddress(ctx)
require.NoError(t, err)

View File

@ -78,27 +78,38 @@ func ReaderParamEncoder(addr string) jsonrpc.Option {
})
}
type waitReadCloser struct {
// watchReadCloser watches the ReadCloser and closes the watch channel when
// either: (1) the ReaderCloser fails on Read (including with a benign error
// like EOF), or (2) when Close is called.
//
// Use it be notified of terminal states, in situations where a Read failure (or
// EOF) is considered a terminal state too (besides Close).
type watchReadCloser struct {
io.ReadCloser
wait chan struct{}
watch chan struct{}
closeOnce sync.Once
}
func (w *waitReadCloser) Read(p []byte) (int, error) {
func (w *watchReadCloser) Read(p []byte) (int, error) {
n, err := w.ReadCloser.Read(p)
if err != nil {
close(w.wait)
w.closeOnce.Do(func() {
close(w.watch)
})
}
return n, err
}
func (w *waitReadCloser) Close() error {
close(w.wait)
func (w *watchReadCloser) Close() error {
w.closeOnce.Do(func() {
close(w.watch)
})
return w.ReadCloser.Close()
}
func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
var readersLk sync.Mutex
readers := map[uuid.UUID]chan *waitReadCloser{}
readers := map[uuid.UUID]chan *watchReadCloser{}
hnd := func(resp http.ResponseWriter, req *http.Request) {
strId := path.Base(req.URL.Path)
@ -111,14 +122,14 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
readersLk.Lock()
ch, found := readers[u]
if !found {
ch = make(chan *waitReadCloser)
ch = make(chan *watchReadCloser)
readers[u] = ch
}
readersLk.Unlock()
wr := &waitReadCloser{
wr := &watchReadCloser{
ReadCloser: req.Body,
wait: make(chan struct{}),
watch: make(chan struct{}),
}
tctx, cancel := context.WithTimeout(req.Context(), Timeout)
@ -134,7 +145,9 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
}
select {
case <-wr.wait:
case <-wr.watch:
// TODO should we check if we failed the Read, and if so
// return an HTTP 500? i.e. turn watch into a chan error?
case <-req.Context().Done():
log.Errorf("context error in reader stream handler (2): %v", req.Context().Err())
resp.WriteHeader(500)
@ -167,7 +180,7 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
readersLk.Lock()
ch, found := readers[u]
if !found {
ch = make(chan *waitReadCloser)
ch = make(chan *watchReadCloser)
readers[u] = ch
}
readersLk.Unlock()

View File

@ -54,7 +54,7 @@ func (rpn *retrievalProviderNode) GetMinerWorkerAddress(ctx context.Context, min
func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
log.Debugf("get sector %d, offset %d, length %d", sectorID, offset, length)
si, err := rpn.sectorsStatus(ctx, sectorID, true)
si, err := rpn.sectorsStatus(ctx, sectorID, false)
if err != nil {
return nil, err
}

View File

@ -25,6 +25,7 @@ import (
)
func TestDealPublisher(t *testing.T) {
t.Skip("this test randomly fails in various subtests; see issue #6799")
testCases := []struct {
name string
publishPeriod time.Duration

View File

@ -311,13 +311,15 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.BasicChainBlockstore), modules.ChainSplitBlockstore),
Override(new(dtypes.BasicStateBlockstore), modules.StateSplitBlockstore),
Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))),
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.SplitBlockstore))),
Override(new(dtypes.ExposedBlockstore), modules.ExposedSplitBlockstore),
Override(new(dtypes.GCReferenceProtector), modules.SplitBlockstoreGCReferenceProtector),
),
If(!cfg.EnableSplitstore,
Override(new(dtypes.BasicChainBlockstore), modules.ChainFlatBlockstore),
Override(new(dtypes.BasicStateBlockstore), modules.StateFlatBlockstore),
Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))),
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))),
Override(new(dtypes.GCReferenceProtector), modules.NoopGCReferenceProtector),
),
Override(new(dtypes.ChainBlockstore), From(new(dtypes.BasicChainBlockstore))),

View File

@ -251,6 +251,8 @@ type Splitstore struct {
ColdStoreType string
HotStoreType string
MarkSetType string
HotStoreMessageRetention uint64
}
// // Full Node

View File

@ -436,7 +436,19 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid)
if piece != nil && !piece.Equals(*p.PieceCID) {
continue
}
out = append(out, a.makeRetrievalQuery(ctx, p, root, piece, rm.QueryParams{}))
// do not rely on local data with respect to peer id
// fetch an up-to-date miner peer id from chain
mi, err := a.StateMinerInfo(ctx, p.Address, types.EmptyTSK)
if err != nil {
return nil, err
}
pp := rm.RetrievalPeer{
Address: p.Address,
ID: *mi.PeerId,
}
out = append(out, a.makeRetrievalQuery(ctx, pp, root, piece, rm.QueryParams{}))
}
return out, nil

View File

@ -78,8 +78,9 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
}
cfg := &splitstore.Config{
MarkSetType: cfg.Splitstore.MarkSetType,
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
MarkSetType: cfg.Splitstore.MarkSetType,
DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard",
HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention,
}
ss, err := splitstore.Open(path, ds, hot, cold, cfg)
if err != nil {
@ -95,6 +96,18 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
}
}
func SplitBlockstoreGCReferenceProtector(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.GCReferenceProtector {
return s.(dtypes.GCReferenceProtector)
}
func NoopGCReferenceProtector(_ fx.Lifecycle) dtypes.GCReferenceProtector {
return dtypes.NoopGCReferenceProtector{}
}
func ExposedSplitBlockstore(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.ExposedBlockstore {
return s.(*splitstore.SplitStore).Expose()
}
func StateFlatBlockstore(_ fx.Lifecycle, _ helpers.MetricsCtx, bs dtypes.UniversalBlockstore) (dtypes.BasicStateBlockstore, error) {
return bs, nil
}

View File

@ -58,7 +58,7 @@ func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dty
return blockservice.New(bs, rem)
}
func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) {
func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal, protector dtypes.GCReferenceProtector) (*messagepool.MessagePool, error) {
mp, err := messagepool.New(mpp, ds, nn, j)
if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err)
@ -68,6 +68,7 @@ func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS
return mp.Close()
},
})
protector.AddProtector(mp.ForEachPendingMessage)
return mp, nil
}

View File

@ -0,0 +1,13 @@
package dtypes
import (
cid "github.com/ipfs/go-cid"
)
type GCReferenceProtector interface {
AddProtector(func(func(cid.Cid) error) error)
}
type NoopGCReferenceProtector struct{}
func (p NoopGCReferenceProtector) AddProtector(func(func(cid.Cid) error) error) {}

View File

@ -327,6 +327,21 @@ func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain
return
}
//
// Tri-state environment variable LOTUS_CHAIN_BADGERSTORE_DISABLE_FSYNC
// - unset == the default (currently fsync enabled)
// - set with a false-y value == fsync enabled no matter what a future default is
// - set with any other value == fsync is disabled ignored defaults (recommended for day-to-day use)
//
if nosyncBs, nosyncBsSet := os.LookupEnv("LOTUS_CHAIN_BADGERSTORE_DISABLE_FSYNC"); nosyncBsSet {
nosyncBs = strings.ToLower(nosyncBs)
if nosyncBs == "" || nosyncBs == "0" || nosyncBs == "false" || nosyncBs == "no" {
opts.SyncWrites = true
} else {
opts.SyncWrites = false
}
}
bs, err := badgerbs.Open(opts)
if err != nil {
fsr.bsErr = err

View File

@ -134,7 +134,7 @@ func (m *Miner) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnC
LastErr: info.LastErr,
Log: log,
// on chain info
SealProof: 0,
SealProof: info.SectorType,
Activation: 0,
Expiration: 0,
DealWeight: big.Zero(),

View File

@ -659,6 +659,7 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, di dline.Info, t
if !bytes.Equal(checkRand, rand) {
log.Warnw("windowpost randomness changed", "old", rand, "new", checkRand, "ts-height", ts.Height(), "challenge-height", di.Challenge, "tsk", ts.Key())
rand = checkRand
continue
}