diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 000000000..b8ec66f00 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,6 @@ +# Reference +# https://docs.github.com/en/github/creating-cloning-and-archiving-repositories/creating-a-repository-on-github/about-code-owners + +# Global owners +# Ensure maintainers team is a requested reviewer for non-draft PRs +* @filecoin-project/lotus-maintainers diff --git a/CHANGELOG.md b/CHANGELOG.md index 11f7d300b..b45c6236d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,38 @@ # Lotus changelog +# 1.10.1 / 2021-07-05 + +This is an optional but **highly recommended** release of Lotus for lotus miners that has many bug fixes and improvements based on the feedback we got from the community since HyperDrive. + +## New Features +- commit batch: AggregateAboveBaseFee config #6650 + - `AggregateAboveBaseFee` is added to miner sealing configuration for setting the network base fee to start aggregating proofs. When the network base fee is lower than this value, the prove commits will be submitted individually via `ProveCommitSector`. According to the [Batch Incentive Alignment](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0013.md#batch-incentive-alignment) introduced in FIP-0013, we recommend miners to set this value to 0.15 nanoFIL(which is the default value) to avoid unexpected aggregation fee in burn and enjoy the most benefits of aggregation! + +## Bug Fixes +- storage: Fix FinalizeSector with sectors in storage paths #6652 +- Fix tiny error in check-client-datacap #6664 +- Fix: precommit_batch method used the wrong cfg.PreCommitBatchWait #6658 +- to optimize the batchwait #6636 +- fix getTicket: sector precommitted but expired case #6635 +- handleSubmitCommitAggregate() exception handling #6595 +- remove precommit check in handleCommitFailed #6634 +- ensure agg fee is adequate +- fix: miner balance is not enough, so that ProveCommitAggregate msg exec failed #6623 +- commit batch: Initialize the FailedSectors map #6647 + +Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| @magik6k| 7 | +151/-56 | 21 | +| @llifezou | 4 | +59/-20 | 4 | +| @johnli-helloworld | 2 | +45/-14 | 4 | +| @wangchao | 1 | +1/-27 | 1 | +| Jerry | 2 | +9/-4 | 2 | +| @zhoutian527 | 1 | +2/-2 | 1 | +| @ribasushi| 1 | +1/-1 | 1 | + + # 1.10.0 / 2021-06-23 This is a mandatory release of Lotus that introduces Filecoin network v13, codenamed the HyperDrive upgrade. The diff --git a/api/api_gateway.go b/api/api_gateway.go index 0ee66ac17..6db1c8e45 100644 --- a/api/api_gateway.go +++ b/api/api_gateway.go @@ -45,6 +45,7 @@ type Gateway interface { StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) StateDealProviderCollateralBounds(ctx context.Context, size abi.PaddedPieceSize, verified bool, tsk types.TipSetKey) (DealCollateralBounds, error) StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) + StateReadState(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*ActorState, error) //perm:read StateListMiners(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) StateMarketBalance(ctx context.Context, addr address.Address, tsk types.TipSetKey) (MarketBalance, error) diff --git a/api/proxy_gen.go b/api/proxy_gen.go index e8ea27469..0fd12425d 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -525,6 +525,8 @@ type GatewayStruct struct { StateNetworkVersion func(p0 context.Context, p1 types.TipSetKey) (apitypes.NetworkVersion, error) `` + StateReadState func(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*ActorState, error) `perm:"read"` + StateSearchMsg func(p0 context.Context, p1 types.TipSetKey, p2 cid.Cid, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) `` StateSectorGetInfo func(p0 context.Context, p1 address.Address, p2 abi.SectorNumber, p3 types.TipSetKey) (*miner.SectorOnChainInfo, error) `` @@ -2663,6 +2665,14 @@ func (s *GatewayStub) StateNetworkVersion(p0 context.Context, p1 types.TipSetKey return *new(apitypes.NetworkVersion), xerrors.New("method not supported") } +func (s *GatewayStruct) StateReadState(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*ActorState, error) { + return s.Internal.StateReadState(p0, p1, p2) +} + +func (s *GatewayStub) StateReadState(p0 context.Context, p1 address.Address, p2 types.TipSetKey) (*ActorState, error) { + return nil, xerrors.New("method not supported") +} + func (s *GatewayStruct) StateSearchMsg(p0 context.Context, p1 types.TipSetKey, p2 cid.Cid, p3 abi.ChainEpoch, p4 bool) (*MsgLookup, error) { return s.Internal.StateSearchMsg(p0, p1, p2, p3, p4) } diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 3da9bd141..821ebb2b6 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -135,6 +135,9 @@ type SplitStore struct { txnRefsMx sync.Mutex txnRefs map[cid.Cid]struct{} txnMissing map[cid.Cid]struct{} + + // registered protectors + protectors []func(func(cid.Cid) error) error } var _ bstore.Blockstore = (*SplitStore)(nil) @@ -527,6 +530,13 @@ func (s *SplitStore) Start(chain ChainAccessor) error { return nil } +func (s *SplitStore) AddProtector(protector func(func(cid.Cid) error) error) { + s.mx.Lock() + defer s.mx.Unlock() + + s.protectors = append(s.protectors, protector) +} + func (s *SplitStore) Close() error { if !atomic.CompareAndSwapInt32(&s.closing, 0, 1) { // already closing diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index 1fc46b9fb..86f035e6f 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -345,6 +345,30 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error { }) } +func (s *SplitStore) applyProtectors() error { + s.mx.Lock() + defer s.mx.Unlock() + + count := 0 + for _, protect := range s.protectors { + err := protect(func(c cid.Cid) error { + s.trackTxnRef(c) + count++ + return nil + }) + + if err != nil { + return xerrors.Errorf("error applynig protector: %w", err) + } + } + + if count > 0 { + log.Infof("protected %d references through %d protectors", count, len(s.protectors)) + } + + return nil +} + // --- Compaction --- // Compaction works transactionally with the following algorithm: // - We prepare a transaction, whereby all i/o referenced objects through the API are tracked. @@ -398,6 +422,14 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { // we are ready for concurrent marking s.beginTxnMarking(markSet) + // 0. track all protected references at beginning of compaction; anything added later should + // be transactionally protected by the write + log.Info("protecting references with registered protectors") + err = s.applyProtectors() + if err != nil { + return err + } + // 1. mark reachable objects by walking the chain from the current epoch; we keep state roots // and messages until the boundary epoch. log.Info("marking reachable objects") diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go index 423a76536..26e5c3cc0 100644 --- a/blockstore/splitstore/splitstore_test.go +++ b/blockstore/splitstore/splitstore_test.go @@ -63,6 +63,20 @@ func testSplitStore(t *testing.T, cfg *Config) { t.Fatal(err) } + // create a garbage block that is protected with a rgistered protector + protected := blocks.NewBlock([]byte("protected!")) + err = hot.Put(protected) + if err != nil { + t.Fatal(err) + } + + // and another one that is not protected + unprotected := blocks.NewBlock([]byte("unprotected!")) + err = hot.Put(unprotected) + if err != nil { + t.Fatal(err) + } + // open the splitstore ss, err := Open("", ds, hot, cold, cfg) if err != nil { @@ -70,6 +84,11 @@ func testSplitStore(t *testing.T, cfg *Config) { } defer ss.Close() //nolint + // register our protector + ss.AddProtector(func(protect func(cid.Cid) error) error { + return protect(protected.Cid()) + }) + err = ss.Start(chain) if err != nil { t.Fatal(err) @@ -132,8 +151,8 @@ func testSplitStore(t *testing.T, cfg *Config) { t.Errorf("expected %d blocks, but got %d", 2, coldCnt) } - if hotCnt != 10 { - t.Errorf("expected %d blocks, but got %d", 10, hotCnt) + if hotCnt != 12 { + t.Errorf("expected %d blocks, but got %d", 12, hotCnt) } // trigger a compaction @@ -146,12 +165,41 @@ func testSplitStore(t *testing.T, cfg *Config) { coldCnt = countBlocks(cold) hotCnt = countBlocks(hot) - if coldCnt != 5 { - t.Errorf("expected %d cold blocks, but got %d", 5, coldCnt) + if coldCnt != 6 { + t.Errorf("expected %d cold blocks, but got %d", 6, coldCnt) } - if hotCnt != 17 { - t.Errorf("expected %d hot blocks, but got %d", 17, hotCnt) + if hotCnt != 18 { + t.Errorf("expected %d hot blocks, but got %d", 18, hotCnt) + } + + // ensure our protected block is still there + has, err := hot.Has(protected.Cid()) + if err != nil { + t.Fatal(err) + } + + if !has { + t.Fatal("protected block is missing from hotstore") + } + + // ensure our unprotected block is in the coldstore now + has, err = hot.Has(unprotected.Cid()) + if err != nil { + t.Fatal(err) + } + + if has { + t.Fatal("unprotected block is still in hotstore") + } + + has, err = cold.Has(unprotected.Cid()) + if err != nil { + t.Fatal(err) + } + + if !has { + t.Fatal("unprotected block is missing from coldstore") } // Make sure we can revert without panicking. diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 2c804bac2..3a1a7918e 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 56f7b21d4..6d6608fd3 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 514b75335..7fd888a89 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 865c18a3a..f6c8e3ac9 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -426,6 +426,27 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ return mp, nil } +func (mp *MessagePool) ForEachPendingMessage(f func(cid.Cid) error) error { + mp.lk.Lock() + defer mp.lk.Unlock() + + for _, mset := range mp.pending { + for _, m := range mset.msgs { + err := f(m.Cid()) + if err != nil { + return err + } + + err = f(m.Message.Cid()) + if err != nil { + return err + } + } + } + + return nil +} + func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) { // check the cache a, f := mp.keyCache[addr] diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index f271249df..2ea8fdec0 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -105,6 +105,7 @@ func (tma *testMpoolAPI) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) { return cid.Undef, nil } + func (tma *testMpoolAPI) IsLite() bool { return false } diff --git a/cli/state.go b/cli/state.go index 5bfcf1e8f..d5251fb85 100644 --- a/cli/state.go +++ b/cli/state.go @@ -446,6 +446,9 @@ var StateExecTraceCmd = &cli.Command{ if err != nil { return err } + if lookup == nil { + return fmt.Errorf("failed to find message: %s", mcid) + } ts, err := capi.ChainGetTipSet(ctx, lookup.TipSet) if err != nil { @@ -1491,6 +1494,10 @@ var StateSearchMsgCmd = &cli.Command{ return err } + if mw == nil { + return fmt.Errorf("failed to find message: %s", msg) + } + m, err := api.ChainGetMessage(ctx, msg) if err != nil { return err diff --git a/cmd/tvx/extract_message.go b/cmd/tvx/extract_message.go index 8e993cbd3..71035867f 100644 --- a/cmd/tvx/extract_message.go +++ b/cmd/tvx/extract_message.go @@ -337,6 +337,9 @@ func resolveFromChain(ctx context.Context, api v0api.FullNode, mcid cid.Cid, blo if err != nil { return nil, nil, nil, fmt.Errorf("failed to locate message: %w", err) } + if msgInfo == nil { + return nil, nil, nil, fmt.Errorf("failed to locate message: not found") + } log.Printf("located message at tipset %s (height: %d) with exit code: %s", msgInfo.TipSet, msgInfo.Height, msgInfo.Receipt.ExitCode) diff --git a/extern/storage-sealing/currentdealinfo.go b/extern/storage-sealing/currentdealinfo.go index 44fa68b54..ed93512c2 100644 --- a/extern/storage-sealing/currentdealinfo.go +++ b/extern/storage-sealing/currentdealinfo.go @@ -69,6 +69,10 @@ func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context return dealID, nil, xerrors.Errorf("looking for publish deal message %s: search msg failed: %w", publishCid, err) } + if lookup == nil { + return dealID, nil, xerrors.Errorf("looking for publish deal message %s: not found", publishCid) + } + if lookup.Receipt.ExitCode != exitcode.Ok { return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", publishCid, lookup.Receipt.ExitCode) } diff --git a/extern/storage-sealing/currentdealinfo_test.go b/extern/storage-sealing/currentdealinfo_test.go index ee51d8c75..b28dd461a 100644 --- a/extern/storage-sealing/currentdealinfo_test.go +++ b/extern/storage-sealing/currentdealinfo_test.go @@ -25,7 +25,7 @@ import ( "github.com/stretchr/testify/require" ) -var errNotFound = errors.New("Could not find") +var errNotFound = errors.New("could not find") func TestGetCurrentDealInfo(t *testing.T) { ctx := context.Background() @@ -180,6 +180,12 @@ func TestGetCurrentDealInfo(t *testing.T) { expectedDealID: zeroDealID, expectedError: xerrors.Errorf("looking for publish deal message %s: search msg failed: something went wrong", dummyCid), }, + "search message not found": { + publishCid: dummyCid, + targetProposal: &proposal, + expectedDealID: zeroDealID, + expectedError: xerrors.Errorf("looking for publish deal message %s: not found", dummyCid), + }, "return code not ok": { publishCid: dummyCid, searchMessageLookup: &MsgLookup{ diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 4a94fb02e..5334fc72e 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -358,8 +358,11 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf } params, pcd, tok, err := m.preCommitParams(ctx, sector) - if params == nil || err != nil { - return err + if err != nil { + return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("preCommitParams: %w", err)}) + } + if params == nil { + return nil // event was sent in preCommitParams } deposit, err := collateralSendAmount(ctx.Context(), m.api, m.maddr, cfg, pcd) @@ -403,9 +406,12 @@ func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector Se } params, deposit, _, err := m.preCommitParams(ctx, sector) - if params == nil || err != nil { + if err != nil { return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("preCommitParams: %w", err)}) } + if params == nil { + return nil // event was sent in preCommitParams + } res, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params) if err != nil { diff --git a/itests/api_test.go b/itests/api_test.go index 1b60630fd..ba77701a2 100644 --- a/itests/api_test.go +++ b/itests/api_test.go @@ -121,6 +121,7 @@ func (ts *apiSuite) testSearchMsg(t *testing.T) { searchRes, err := full.StateSearchMsg(ctx, types.EmptyTSK, sm.Cid(), lapi.LookbackNoLimit, true) require.NoError(t, err) + require.NotNil(t, searchRes) require.Equalf(t, res.TipSet, searchRes.TipSet, "search ts: %s, different from wait ts: %s", searchRes.TipSet, res.TipSet) } diff --git a/itests/wdpost_test.go b/itests/wdpost_test.go index e5a4fcee1..6764350cc 100644 --- a/itests/wdpost_test.go +++ b/itests/wdpost_test.go @@ -213,12 +213,18 @@ func TestWindowPostBaseFeeNoBurn(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + sched := kit.DefaultTestUpgradeSchedule + lastUpgradeHeight := sched[len(sched)-1].Height + och := build.UpgradeClausHeight - build.UpgradeClausHeight = 10 + build.UpgradeClausHeight = lastUpgradeHeight + 1 client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs()) ens.InterconnectAll().BeginMining(blocktime) + // Wait till all upgrades are done and we've passed the clause epoch. + client.WaitTillChain(ctx, kit.HeightAtLeast(build.UpgradeClausHeight+1)) + maddr, err := miner.ActorAddress(ctx) require.NoError(t, err) @@ -268,6 +274,12 @@ func TestWindowPostBaseFeeBurn(t *testing.T) { client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), opts) ens.InterconnectAll().BeginMining(blocktime) + // Ideally we'd be a bit more precise here, but getting the information we need from the + // test framework is more work than it's worth. + // + // We just need to wait till all upgrades are done. + client.WaitTillChain(ctx, kit.HeightAtLeast(20)) + maddr, err := miner.ActorAddress(ctx) require.NoError(t, err) diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go index 8bd512705..23944af6c 100644 --- a/lib/rpcenc/reader.go +++ b/lib/rpcenc/reader.go @@ -78,27 +78,38 @@ func ReaderParamEncoder(addr string) jsonrpc.Option { }) } -type waitReadCloser struct { +// watchReadCloser watches the ReadCloser and closes the watch channel when +// either: (1) the ReaderCloser fails on Read (including with a benign error +// like EOF), or (2) when Close is called. +// +// Use it be notified of terminal states, in situations where a Read failure (or +// EOF) is considered a terminal state too (besides Close). +type watchReadCloser struct { io.ReadCloser - wait chan struct{} + watch chan struct{} + closeOnce sync.Once } -func (w *waitReadCloser) Read(p []byte) (int, error) { +func (w *watchReadCloser) Read(p []byte) (int, error) { n, err := w.ReadCloser.Read(p) if err != nil { - close(w.wait) + w.closeOnce.Do(func() { + close(w.watch) + }) } return n, err } -func (w *waitReadCloser) Close() error { - close(w.wait) +func (w *watchReadCloser) Close() error { + w.closeOnce.Do(func() { + close(w.watch) + }) return w.ReadCloser.Close() } func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { var readersLk sync.Mutex - readers := map[uuid.UUID]chan *waitReadCloser{} + readers := map[uuid.UUID]chan *watchReadCloser{} hnd := func(resp http.ResponseWriter, req *http.Request) { strId := path.Base(req.URL.Path) @@ -111,14 +122,14 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { readersLk.Lock() ch, found := readers[u] if !found { - ch = make(chan *waitReadCloser) + ch = make(chan *watchReadCloser) readers[u] = ch } readersLk.Unlock() - wr := &waitReadCloser{ + wr := &watchReadCloser{ ReadCloser: req.Body, - wait: make(chan struct{}), + watch: make(chan struct{}), } tctx, cancel := context.WithTimeout(req.Context(), Timeout) @@ -134,7 +145,9 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { } select { - case <-wr.wait: + case <-wr.watch: + // TODO should we check if we failed the Read, and if so + // return an HTTP 500? i.e. turn watch into a chan error? case <-req.Context().Done(): log.Errorf("context error in reader stream handler (2): %v", req.Context().Err()) resp.WriteHeader(500) @@ -167,7 +180,7 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { readersLk.Lock() ch, found := readers[u] if !found { - ch = make(chan *waitReadCloser) + ch = make(chan *watchReadCloser) readers[u] = ch } readersLk.Unlock() diff --git a/markets/retrievaladapter/provider.go b/markets/retrievaladapter/provider.go index 06ebd989a..2f6305805 100644 --- a/markets/retrievaladapter/provider.go +++ b/markets/retrievaladapter/provider.go @@ -54,7 +54,7 @@ func (rpn *retrievalProviderNode) GetMinerWorkerAddress(ctx context.Context, min func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { log.Debugf("get sector %d, offset %d, length %d", sectorID, offset, length) - si, err := rpn.sectorsStatus(ctx, sectorID, true) + si, err := rpn.sectorsStatus(ctx, sectorID, false) if err != nil { return nil, err } diff --git a/markets/storageadapter/dealpublisher_test.go b/markets/storageadapter/dealpublisher_test.go index 3f27425ae..b2f107bf4 100644 --- a/markets/storageadapter/dealpublisher_test.go +++ b/markets/storageadapter/dealpublisher_test.go @@ -25,6 +25,7 @@ import ( ) func TestDealPublisher(t *testing.T) { + t.Skip("this test randomly fails in various subtests; see issue #6799") testCases := []struct { name string publishPeriod time.Duration diff --git a/node/builder.go b/node/builder.go index dc9e1f8b7..771368917 100644 --- a/node/builder.go +++ b/node/builder.go @@ -312,12 +312,14 @@ func Repo(r repo.Repo) Option { Override(new(dtypes.BasicStateBlockstore), modules.StateSplitBlockstore), Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))), Override(new(dtypes.ExposedBlockstore), modules.ExposedSplitBlockstore), + Override(new(dtypes.GCReferenceProtector), modules.SplitBlockstoreGCReferenceProtector), ), If(!cfg.EnableSplitstore, Override(new(dtypes.BasicChainBlockstore), modules.ChainFlatBlockstore), Override(new(dtypes.BasicStateBlockstore), modules.StateFlatBlockstore), Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))), Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))), + Override(new(dtypes.GCReferenceProtector), modules.NoopGCReferenceProtector), ), Override(new(dtypes.ChainBlockstore), From(new(dtypes.BasicChainBlockstore))), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index aa5ff9814..7ba6463e6 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -436,7 +436,19 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid, piece *cid.Cid) if piece != nil && !piece.Equals(*p.PieceCID) { continue } - out = append(out, a.makeRetrievalQuery(ctx, p, root, piece, rm.QueryParams{})) + + // do not rely on local data with respect to peer id + // fetch an up-to-date miner peer id from chain + mi, err := a.StateMinerInfo(ctx, p.Address, types.EmptyTSK) + if err != nil { + return nil, err + } + pp := rm.RetrievalPeer{ + Address: p.Address, + ID: *mi.PeerId, + } + + out = append(out, a.makeRetrievalQuery(ctx, pp, root, piece, rm.QueryParams{})) } return out, nil diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index e8eb64c15..2588e3f98 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -96,6 +96,14 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked } } +func SplitBlockstoreGCReferenceProtector(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.GCReferenceProtector { + return s.(dtypes.GCReferenceProtector) +} + +func NoopGCReferenceProtector(_ fx.Lifecycle) dtypes.GCReferenceProtector { + return dtypes.NoopGCReferenceProtector{} +} + func ExposedSplitBlockstore(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.ExposedBlockstore { return s.(*splitstore.SplitStore).Expose() } diff --git a/node/modules/chain.go b/node/modules/chain.go index 954322948..c4017b8c0 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -58,7 +58,7 @@ func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dty return blockservice.New(bs, rem) } -func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) { +func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal, protector dtypes.GCReferenceProtector) (*messagepool.MessagePool, error) { mp, err := messagepool.New(mpp, ds, nn, j) if err != nil { return nil, xerrors.Errorf("constructing mpool: %w", err) @@ -68,6 +68,7 @@ func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS return mp.Close() }, }) + protector.AddProtector(mp.ForEachPendingMessage) return mp, nil } diff --git a/node/modules/dtypes/protector.go b/node/modules/dtypes/protector.go new file mode 100644 index 000000000..0d9625fc1 --- /dev/null +++ b/node/modules/dtypes/protector.go @@ -0,0 +1,13 @@ +package dtypes + +import ( + cid "github.com/ipfs/go-cid" +) + +type GCReferenceProtector interface { + AddProtector(func(func(cid.Cid) error) error) +} + +type NoopGCReferenceProtector struct{} + +func (p NoopGCReferenceProtector) AddProtector(func(func(cid.Cid) error) error) {} diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index a40ae62d0..9323410dd 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -327,6 +327,21 @@ func (fsr *fsLockedRepo) Blockstore(ctx context.Context, domain BlockstoreDomain return } + // + // Tri-state environment variable LOTUS_CHAIN_BADGERSTORE_DISABLE_FSYNC + // - unset == the default (currently fsync enabled) + // - set with a false-y value == fsync enabled no matter what a future default is + // - set with any other value == fsync is disabled ignored defaults (recommended for day-to-day use) + // + if nosyncBs, nosyncBsSet := os.LookupEnv("LOTUS_CHAIN_BADGERSTORE_DISABLE_FSYNC"); nosyncBsSet { + nosyncBs = strings.ToLower(nosyncBs) + if nosyncBs == "" || nosyncBs == "0" || nosyncBs == "false" || nosyncBs == "no" { + opts.SyncWrites = true + } else { + opts.SyncWrites = false + } + } + bs, err := badgerbs.Open(opts) if err != nil { fsr.bsErr = err diff --git a/storage/miner_sealing.go b/storage/miner_sealing.go index a5e838b89..38b24e8c1 100644 --- a/storage/miner_sealing.go +++ b/storage/miner_sealing.go @@ -134,7 +134,7 @@ func (m *Miner) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnC LastErr: info.LastErr, Log: log, // on chain info - SealProof: 0, + SealProof: info.SectorType, Activation: 0, Expiration: 0, DealWeight: big.Zero(),