From a084445e6c2c6e500b761a4d48859a55251c94b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 16 Jun 2022 13:15:49 +0200 Subject: [PATCH] sealing pipeline: Use non-special ChainHead --- .../storageadapter/ondealsectorcommitted.go | 4 +- storage/adapter_storage_miner.go | 13 ++-- storage/pipeline/checks.go | 8 +-- storage/pipeline/commit_batch.go | 30 ++++----- storage/pipeline/commit_batch_test.go | 36 +++++++++- storage/pipeline/currentdealinfo.go | 1 + storage/pipeline/input.go | 14 ++-- storage/pipeline/mocks/api.go | 15 +++-- storage/pipeline/mocks/mock_commit_batcher.go | 9 ++- .../pipeline/mocks/mock_precommit_batcher.go | 9 ++- storage/pipeline/precommit_batch.go | 14 ++-- storage/pipeline/precommit_batch_test.go | 6 +- storage/pipeline/precommit_policy.go | 12 ++-- storage/pipeline/precommit_policy_test.go | 4 +- storage/pipeline/sealing.go | 2 +- storage/pipeline/states_failed.go | 38 +++++------ storage/pipeline/states_failed_test.go | 2 +- storage/pipeline/states_proving.go | 10 +-- storage/pipeline/states_replica_update.go | 22 +++---- storage/pipeline/states_sealing.go | 66 +++++++++---------- storage/pipeline/upgrade_queue.go | 8 +-- 21 files changed, 174 insertions(+), 149 deletions(-) diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index 17e24b92f..fc8591197 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -41,9 +41,9 @@ type SectorCommittedManager struct { dpc diffPreCommitsAPI } -func NewSectorCommittedManager(ev eventsCalledAPI, tskAPI pipeline.CurrentDealInfoTskAPI, dpcAPI diffPreCommitsAPI) *SectorCommittedManager { +func NewSectorCommittedManager(ev eventsCalledAPI, tskAPI pipeline.CurrentDealInfoAPI, dpcAPI diffPreCommitsAPI) *SectorCommittedManager { dim := &pipeline.CurrentDealInfoManager{ - CDAPI: &pipeline.CurrentDealInfoAPIAdapter{CurrentDealInfoTskAPI: tskAPI}, + CDAPI: tskAPI, } return newSectorCommittedManager(ev, dim, dpcAPI) } diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index abf35f78e..b253f26ae 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -111,6 +111,10 @@ func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, from types.TipSet return s.delegate.StateSearchMsg(ctx, from, msg, limit, allowReplaced) } +func (s SealingAPIAdapter) ChainHead(ctx context.Context) (*types.TipSet, error) { + return s.delegate.ChainHead(ctx) +} + func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tsk types.TipSetKey) (cid.Cid, error) { nv, err := s.delegate.StateNetworkVersion(ctx, tsk) @@ -233,15 +237,6 @@ func (s SealingAPIAdapter) StateMarketStorageDealProposal(ctx context.Context, d return deal.Proposal, nil } -func (s SealingAPIAdapter) ChainHead(ctx context.Context) (types.TipSetKey, abi.ChainEpoch, error) { - head, err := s.delegate.ChainHead(ctx) - if err != nil { - return types.EmptyTSK, 0, err - } - - return head.Key(), head.Height(), nil -} - func (s SealingAPIAdapter) ChainBaseFee(ctx context.Context, tsk types.TipSetKey) (abi.TokenAmount, error) { ts, err := s.delegate.ChainGetTipSet(ctx, tsk) if err != nil { diff --git a/storage/pipeline/checks.go b/storage/pipeline/checks.go index 1a6a6599f..96800b3d6 100644 --- a/storage/pipeline/checks.go +++ b/storage/pipeline/checks.go @@ -40,7 +40,7 @@ type ErrBadRU struct{ error } type ErrBadPR struct{ error } func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI, mustHaveDeals bool) error { - tok, height, err := api.ChainHead(ctx) + ts, err := api.ChainHead(ctx) if err != nil { return &ErrApi{xerrors.Errorf("getting chain head: %w", err)} } @@ -60,7 +60,7 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api dealCount++ - proposal, err := api.StateMarketStorageDealProposal(ctx, p.DealInfo.DealID, tok) + proposal, err := api.StateMarketStorageDealProposal(ctx, p.DealInfo.DealID, ts.Key()) if err != nil { return &ErrInvalidDeals{xerrors.Errorf("getting deal %d for piece %d: %w", p.DealInfo.DealID, i, err)} } @@ -77,8 +77,8 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.Size, proposal.PieceSize)} } - if height >= proposal.StartEpoch { - return &ErrExpiredDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, height)} + if ts.Height() >= proposal.StartEpoch { + return &ErrExpiredDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, ts.Height())} } } diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index ab63861f6..78f83f111 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -37,7 +37,7 @@ var aggFeeDen = big.NewInt(100) type CommitBatcherApi interface { SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) - ChainHead(ctx context.Context) (types.TipSetKey, abi.ChainEpoch, error) + ChainHead(ctx context.Context) (*types.TipSet, error) ChainBaseFee(context.Context, types.TipSetKey) (abi.TokenAmount, error) StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error) @@ -204,14 +204,14 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, var res []sealiface.CommitBatchRes - tok, h, err := b.api.ChainHead(b.mctx) + ts, err := b.api.ChainHead(b.mctx) if err != nil { return nil, err } blackedOut := func() bool { const nv16BlackoutWindow = abi.ChainEpoch(20) // a magik number - if h <= build.UpgradeSkyrHeight && build.UpgradeSkyrHeight-h < nv16BlackoutWindow { + if ts.Height() <= build.UpgradeSkyrHeight && build.UpgradeSkyrHeight-ts.Height() < nv16BlackoutWindow { return true } return false @@ -221,7 +221,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) { - bf, err := b.api.ChainBaseFee(b.mctx, tok) + bf, err := b.api.ChainBaseFee(b.mctx, ts.Key()) if err != nil { return nil, xerrors.Errorf("couldn't get base fee: %w", err) } @@ -265,7 +265,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes, } func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) { - tok, _, err := b.api.ChainHead(b.mctx) + ts, err := b.api.ChainHead(b.mctx) if err != nil { return nil, err } @@ -292,7 +292,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa res.Sectors = append(res.Sectors, id) - sc, err := b.getSectorCollateral(id, tok) + sc, err := b.getSectorCollateral(id, ts.Key()) if err != nil { res.FailedSectors[id] = err.Error() continue @@ -321,7 +321,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err) } - nv, err := b.api.StateNetworkVersion(b.mctx, tok) + nv, err := b.api.StateNetworkVersion(b.mctx, ts.Key()) if err != nil { log.Errorf("getting network version: %s", err) return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err) @@ -354,7 +354,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa maxFee := b.feeCfg.MaxCommitBatchGasFee.FeeForSectors(len(infos)) - bf, err := b.api.ChainBaseFee(b.mctx, tok) + bf, err := b.api.ChainBaseFee(b.mctx, ts.Key()) if err != nil { return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get base fee: %w", err) } @@ -412,7 +412,7 @@ func (b *CommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.C } } - tok, _, err := b.api.ChainHead(b.mctx) + ts, err := b.api.ChainHead(b.mctx) if err != nil { return nil, err } @@ -425,7 +425,7 @@ func (b *CommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.C FailedSectors: map[abi.SectorNumber]string{}, } - mcid, err := b.processSingle(cfg, mi, &avail, sn, info, tok) + mcid, err := b.processSingle(cfg, mi, &avail, sn, info, ts.Key()) if err != nil { log.Errorf("process single error: %+v", err) // todo: return to user r.FailedSectors[sn] = err.Error() @@ -569,18 +569,18 @@ func (b *CommitBatcher) Stop(ctx context.Context) error { // TODO: If this returned epochs, it would make testing much easier func (b *CommitBatcher) getCommitCutoff(si SectorInfo) (time.Time, error) { - tok, curEpoch, err := b.api.ChainHead(b.mctx) + ts, err := b.api.ChainHead(b.mctx) if err != nil { return time.Now(), xerrors.Errorf("getting chain head: %s", err) } - nv, err := b.api.StateNetworkVersion(b.mctx, tok) + nv, err := b.api.StateNetworkVersion(b.mctx, ts.Key()) if err != nil { log.Errorf("getting network version: %s", err) return time.Now(), xerrors.Errorf("getting network version: %s", err) } - pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, si.SectorNumber, tok) + pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, si.SectorNumber, ts.Key()) if err != nil { log.Errorf("getting precommit info: %s", err) return time.Now(), err @@ -609,11 +609,11 @@ func (b *CommitBatcher) getCommitCutoff(si SectorInfo) (time.Time, error) { } } - if cutoffEpoch <= curEpoch { + if cutoffEpoch <= ts.Height() { return time.Now(), nil } - return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second), nil + return time.Now().Add(time.Duration(cutoffEpoch-ts.Height()) * time.Duration(build.BlockDelaySecs) * time.Second), nil } func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok types.TipSetKey) (abi.TokenAmount, error) { diff --git a/storage/pipeline/commit_batch_test.go b/storage/pipeline/commit_batch_test.go index 918ef0037..32ea9d931 100644 --- a/storage/pipeline/commit_batch_test.go +++ b/storage/pipeline/commit_batch_test.go @@ -10,12 +10,14 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" minertypes "github.com/filecoin-project/go-state-types/builtin/v8/miner" + "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/network" prooftypes "github.com/filecoin-project/go-state-types/proof" miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner" @@ -105,7 +107,7 @@ func TestCommitBatcher(t *testing.T) { SectorNumber: sn, } - s.EXPECT().ChainHead(gomock.Any()).Return(types.EmptyTSK, abi.ChainEpoch(1), nil) + s.EXPECT().ChainHead(gomock.Any()).Return(makeTs(t, 1), nil) s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil) s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&minertypes.SectorPreCommitOnChainInfo{ PreCommitDeposit: big.Zero(), @@ -166,7 +168,7 @@ func TestCommitBatcher(t *testing.T) { basefee = types.NanoFil } - s.EXPECT().ChainHead(gomock.Any()).Return(types.EmptyTSK, abi.ChainEpoch(1), nil) + s.EXPECT().ChainHead(gomock.Any()).Return(makeTs(t, 1), nil) if batch { s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil) } @@ -176,7 +178,7 @@ func TestCommitBatcher(t *testing.T) { ti = len(expect) } - s.EXPECT().ChainHead(gomock.Any()).Return(types.EmptyTSK, abi.ChainEpoch(1), nil) + s.EXPECT().ChainHead(gomock.Any()).Return(makeTs(t, 1), nil) pciC := len(expect) if failOnePCI { @@ -385,3 +387,31 @@ func (f fakeProver) AggregateSealProofs(aggregateInfo prooftypes.AggregateSealVe } var _ ffiwrapper.Prover = &fakeProver{} + +func makeTs(t *testing.T, h abi.ChainEpoch) *types.TipSet { + a, _ := address.NewFromString("t00") + dummyCid, _ := cid.Parse("bafkqaaa") + + var ts, err = types.NewTipSet([]*types.BlockHeader{ + { + Height: h, + Miner: a, + + Parents: []cid.Cid{}, + + Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}}, + + ParentStateRoot: dummyCid, + Messages: dummyCid, + ParentMessageReceipts: dummyCid, + + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + }, + }) + if t != nil { + require.NoError(t, err) + } + + return ts +} diff --git a/storage/pipeline/currentdealinfo.go b/storage/pipeline/currentdealinfo.go index 99086c753..d39ef9a5d 100644 --- a/storage/pipeline/currentdealinfo.go +++ b/storage/pipeline/currentdealinfo.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/ipfs/go-cid" "golang.org/x/xerrors" diff --git a/storage/pipeline/input.go b/storage/pipeline/input.go index beed6f0cc..87270af39 100644 --- a/storage/pipeline/input.go +++ b/storage/pipeline/input.go @@ -124,7 +124,7 @@ func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay) // check deal age, start sealing when the deal closest to starting is within slack time - _, current, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) blockTime := time.Second * time.Duration(build.BlockDelaySecs) if err != nil { return false, xerrors.Errorf("API error getting head: %w", err) @@ -134,7 +134,7 @@ func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, continue } dealSafeSealEpoch := piece.DealInfo.DealProposal.StartEpoch - cfg.StartEpochSealingBuffer - dealSafeSealTime := time.Now().Add(time.Duration(dealSafeSealEpoch-current) * blockTime) + dealSafeSealTime := time.Now().Add(time.Duration(dealSafeSealEpoch-ts.Height()) * blockTime) if dealSafeSealTime.Before(sealTime) { sealTime = dealSafeSealTime } @@ -303,14 +303,14 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec return api.SectorOffset{}, xerrors.Errorf("getting config: %w", err) } - _, head, err := m.Api.ChainHead(ctx) + ts, err := m.Api.ChainHead(ctx) if err != nil { return api.SectorOffset{}, xerrors.Errorf("couldnt get chain head: %w", err) } - if head+cfg.StartEpochSealingBuffer > deal.DealProposal.StartEpoch { + if ts.Height()+cfg.StartEpochSealingBuffer > deal.DealProposal.StartEpoch { return api.SectorOffset{}, xerrors.Errorf( "cannot add piece for deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d", - deal.DealProposal.PieceCID, head, deal.DealProposal.StartEpoch) + deal.DealProposal.PieceCID, ts.Height(), deal.DealProposal.StartEpoch) } m.inputLk.Lock() @@ -544,14 +544,14 @@ func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize } } - _, curEpoch, err := m.Api.ChainHead(ctx) + ts, err := m.Api.ChainHead(ctx) if err != nil { return 0, 0, xerrors.Errorf("getting current epoch: %w", err) } minDur, maxDur := policy.DealDurationBounds(0) - return curEpoch + minDur, curEpoch + maxDur, nil + return ts.Height() + minDur, ts.Height() + maxDur, nil } func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) { diff --git a/storage/pipeline/mocks/api.go b/storage/pipeline/mocks/api.go index c7be5c0ae..f62928afd 100644 --- a/storage/pipeline/mocks/api.go +++ b/storage/pipeline/mocks/api.go @@ -8,6 +8,9 @@ import ( context "context" reflect "reflect" + gomock "github.com/golang/mock/gomock" + cid "github.com/ipfs/go-cid" + address "github.com/filecoin-project/go-address" abi "github.com/filecoin-project/go-state-types/abi" big "github.com/filecoin-project/go-state-types/big" @@ -16,11 +19,10 @@ import ( crypto "github.com/filecoin-project/go-state-types/crypto" dline "github.com/filecoin-project/go-state-types/dline" network "github.com/filecoin-project/go-state-types/network" + api "github.com/filecoin-project/lotus/api" types "github.com/filecoin-project/lotus/chain/types" sealing "github.com/filecoin-project/lotus/storage/pipeline" - gomock "github.com/golang/mock/gomock" - cid "github.com/ipfs/go-cid" ) // MockSealingAPI is a mock of SealingAPI interface. @@ -77,13 +79,12 @@ func (mr *MockSealingAPIMockRecorder) ChainGetMessage(arg0, arg1 interface{}) *g } // ChainHead mocks base method. -func (m *MockSealingAPI) ChainHead(arg0 context.Context) (types.TipSetKey, abi.ChainEpoch, error) { +func (m *MockSealingAPI) ChainHead(arg0 context.Context) (*types.TipSet, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ChainHead", arg0) - ret0, _ := ret[0].(types.TipSetKey) - ret1, _ := ret[1].(abi.ChainEpoch) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret0, _ := ret[0].(*types.TipSet) + ret1, _ := ret[1].(error) + return ret0, ret1 } // ChainHead indicates an expected call of ChainHead. diff --git a/storage/pipeline/mocks/mock_commit_batcher.go b/storage/pipeline/mocks/mock_commit_batcher.go index 55efe4305..f46e9e304 100644 --- a/storage/pipeline/mocks/mock_commit_batcher.go +++ b/storage/pipeline/mocks/mock_commit_batcher.go @@ -60,13 +60,12 @@ func (mr *MockCommitBatcherApiMockRecorder) ChainBaseFee(arg0, arg1 interface{}) } // ChainHead mocks base method. -func (m *MockCommitBatcherApi) ChainHead(arg0 context.Context) (types.TipSetKey, abi.ChainEpoch, error) { +func (m *MockCommitBatcherApi) ChainHead(arg0 context.Context) (*types.TipSet, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ChainHead", arg0) - ret0, _ := ret[0].(types.TipSetKey) - ret1, _ := ret[1].(abi.ChainEpoch) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret0, _ := ret[0].(*types.TipSet) + ret1, _ := ret[1].(error) + return ret0, ret1 } // ChainHead indicates an expected call of ChainHead. diff --git a/storage/pipeline/mocks/mock_precommit_batcher.go b/storage/pipeline/mocks/mock_precommit_batcher.go index 88b1c03c9..5352d3803 100644 --- a/storage/pipeline/mocks/mock_precommit_batcher.go +++ b/storage/pipeline/mocks/mock_precommit_batcher.go @@ -59,13 +59,12 @@ func (mr *MockPreCommitBatcherApiMockRecorder) ChainBaseFee(arg0, arg1 interface } // ChainHead mocks base method. -func (m *MockPreCommitBatcherApi) ChainHead(arg0 context.Context) (types.TipSetKey, abi.ChainEpoch, error) { +func (m *MockPreCommitBatcherApi) ChainHead(arg0 context.Context) (*types.TipSet, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ChainHead", arg0) - ret0, _ := ret[0].(types.TipSetKey) - ret1, _ := ret[1].(abi.ChainEpoch) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret0, _ := ret[0].(*types.TipSet) + ret1, _ := ret[1].(error) + return ret0, ret1 } // ChainHead indicates an expected call of ChainHead. diff --git a/storage/pipeline/precommit_batch.go b/storage/pipeline/precommit_batch.go index b8afdf0c7..856a71fdb 100644 --- a/storage/pipeline/precommit_batch.go +++ b/storage/pipeline/precommit_batch.go @@ -31,7 +31,7 @@ type PreCommitBatcherApi interface { SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error) - ChainHead(ctx context.Context) (types.TipSetKey, abi.ChainEpoch, error) + ChainHead(ctx context.Context) (*types.TipSet, error) ChainBaseFee(context.Context, types.TipSetKey) (abi.TokenAmount, error) StateNetworkVersion(ctx context.Context, tok types.TipSetKey) (network.Version, error) } @@ -188,18 +188,18 @@ func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBat return nil, nil } - tok, _, err := b.api.ChainHead(b.mctx) + ts, err := b.api.ChainHead(b.mctx) if err != nil { return nil, err } - bf, err := b.api.ChainBaseFee(b.mctx, tok) + bf, err := b.api.ChainBaseFee(b.mctx, ts.Key()) if err != nil { return nil, xerrors.Errorf("couldn't get base fee: %w", err) } // TODO: Drop this once nv14 has come and gone - nv, err := b.api.StateNetworkVersion(b.mctx, tok) + nv, err := b.api.StateNetworkVersion(b.mctx, ts.Key()) if err != nil { return nil, xerrors.Errorf("couldn't get network version: %w", err) } @@ -212,7 +212,7 @@ func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBat // todo support multiple batches var res []sealiface.PreCommitBatchRes if !individual { - res, err = b.processBatch(cfg, tok, bf, nv) + res, err = b.processBatch(cfg, ts.Key(), bf, nv) } else { res, err = b.processIndividually(cfg) } @@ -378,13 +378,13 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tok types.TipSetKe // register PreCommit, wait for batch message, return message CID func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner.SectorPreCommitInfo) (res sealiface.PreCommitBatchRes, err error) { - _, curEpoch, err := b.api.ChainHead(b.mctx) + ts, err := b.api.ChainHead(b.mctx) if err != nil { log.Errorf("getting chain head: %s", err) return sealiface.PreCommitBatchRes{}, err } - cutoff, err := getPreCommitCutoff(curEpoch, s) + cutoff, err := getPreCommitCutoff(ts.Height(), s) if err != nil { return sealiface.PreCommitBatchRes{}, xerrors.Errorf("failed to calculate cutoff: %w", err) } diff --git a/storage/pipeline/precommit_batch_test.go b/storage/pipeline/precommit_batch_test.go index 24403c706..3c88af077 100644 --- a/storage/pipeline/precommit_batch_test.go +++ b/storage/pipeline/precommit_batch_test.go @@ -109,7 +109,7 @@ func TestPrecommitBatcher(t *testing.T) { SectorNumber: sn, } - s.EXPECT().ChainHead(gomock.Any()).Return(types.EmptyTSK, abi.ChainEpoch(1), nil) + s.EXPECT().ChainHead(gomock.Any()).Return(makeTs(t, 1), nil) go func() { defer done.Unlock() @@ -153,7 +153,7 @@ func TestPrecommitBatcher(t *testing.T) { //stm: @CHAIN_STATE_MINER_INFO_001, @CHAIN_STATE_NETWORK_VERSION_001 expectSend := func(expect []abi.SectorNumber) action { return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise { - s.EXPECT().ChainHead(gomock.Any()).Return(types.EmptyTSK, abi.ChainEpoch(1), nil) + s.EXPECT().ChainHead(gomock.Any()).Return(makeTs(t, 1), nil) s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(10001), nil) s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version14, nil) @@ -174,7 +174,7 @@ func TestPrecommitBatcher(t *testing.T) { //stm: @CHAIN_STATE_MINER_INFO_001, @CHAIN_STATE_NETWORK_VERSION_001 expectSendsSingle := func(expect []abi.SectorNumber) action { return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise { - s.EXPECT().ChainHead(gomock.Any()).Return(types.EmptyTSK, abi.ChainEpoch(1), nil) + s.EXPECT().ChainHead(gomock.Any()).Return(makeTs(t, 1), nil) s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(9999), nil) s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version14, nil) diff --git a/storage/pipeline/precommit_policy.go b/storage/pipeline/precommit_policy.go index db6394be3..8aa5ff4b5 100644 --- a/storage/pipeline/precommit_policy.go +++ b/storage/pipeline/precommit_policy.go @@ -19,7 +19,7 @@ type PreCommitPolicy interface { } type Chain interface { - ChainHead(ctx context.Context) (types.TipSetKey, abi.ChainEpoch, error) + ChainHead(context.Context) (*types.TipSet, error) StateNetworkVersion(ctx context.Context, tok types.TipSetKey) (network.Version, error) } @@ -59,7 +59,7 @@ func NewBasicPreCommitPolicy(api Chain, cfgGetter GetSealingConfigFunc, provingB // Expiration produces the pre-commit sector expiration epoch for an encoded // replica containing the provided enumeration of pieces and deals. func (p *BasicPreCommitPolicy) Expiration(ctx context.Context, ps ...Piece) (abi.ChainEpoch, error) { - _, epoch, err := p.api.ChainHead(ctx) + ts, err := p.api.ChainHead(ctx) if err != nil { return 0, err } @@ -71,8 +71,8 @@ func (p *BasicPreCommitPolicy) Expiration(ctx context.Context, ps ...Piece) (abi continue } - if p.DealInfo.DealSchedule.EndEpoch < epoch { - log.Warnf("piece schedule %+v ended before current epoch %d", p, epoch) + if p.DealInfo.DealSchedule.EndEpoch < ts.Height() { + log.Warnf("piece schedule %+v ended before current epoch %d", p, ts.Height()) continue } @@ -89,13 +89,13 @@ func (p *BasicPreCommitPolicy) Expiration(ctx context.Context, ps ...Piece) (abi return 0, err } - tmp := epoch + expirationDuration + tmp := ts.Height() + expirationDuration end = &tmp } // Ensure there is at least one day for the PC message to land without falling below min sector lifetime // TODO: The "one day" should probably be a config, though it doesn't matter too much - minExp := epoch + policy.GetMinSectorExpiration() + miner.WPoStProvingPeriod + minExp := ts.Height() + policy.GetMinSectorExpiration() + miner.WPoStProvingPeriod if *end < minExp { end = &minExp } diff --git a/storage/pipeline/precommit_policy_test.go b/storage/pipeline/precommit_policy_test.go index 544502cf9..dd5764fff 100644 --- a/storage/pipeline/precommit_policy_test.go +++ b/storage/pipeline/precommit_policy_test.go @@ -46,8 +46,8 @@ func (f *fakeChain) StateNetworkVersion(ctx context.Context, tok types.TipSetKey return build.NewestNetworkVersion, nil } -func (f *fakeChain) ChainHead(ctx context.Context) (types.TipSetKey, abi.ChainEpoch, error) { - return types.NewTipSetKey(), f.h, nil +func (f *fakeChain) ChainHead(ctx context.Context) (*types.TipSet, error) { + return makeTs(nil, f.h), nil } func fakePieceCid(t *testing.T) cid.Cid { diff --git a/storage/pipeline/sealing.go b/storage/pipeline/sealing.go index 1f7b0d53d..d690eaf49 100644 --- a/storage/pipeline/sealing.go +++ b/storage/pipeline/sealing.go @@ -69,7 +69,7 @@ type SealingAPI interface { StateMinerDeadlines(context.Context, address.Address, types.TipSetKey) ([]api.Deadline, error) StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tok types.TipSetKey) ([]api.Partition, error) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) - ChainHead(ctx context.Context) (types.TipSetKey, abi.ChainEpoch, error) + ChainHead(ctx context.Context) (*types.TipSet, error) ChainBaseFee(context.Context, types.TipSetKey) (abi.TokenAmount, error) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tok types.TipSetKey) (abi.Randomness, error) diff --git a/storage/pipeline/states_failed.go b/storage/pipeline/states_failed.go index 552fcee68..bd1413ad5 100644 --- a/storage/pipeline/states_failed.go +++ b/storage/pipeline/states_failed.go @@ -2,7 +2,6 @@ package sealing import ( "context" - "github.com/filecoin-project/lotus/api" "time" "github.com/hashicorp/go-multierror" @@ -15,6 +14,7 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/go-statemachine" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/types" ) @@ -38,13 +38,13 @@ func failedCooldown(ctx statemachine.Context, sector SectorInfo) error { } func (m *Sealing) checkPreCommitted(ctx statemachine.Context, sector SectorInfo) (*miner.SectorPreCommitOnChainInfo, bool) { - tok, _, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleSealPrecommit1Failed(%d): temp error: %+v", sector.SectorNumber, err) return nil, false } - info, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + info, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key()) if err != nil { log.Errorf("handleSealPrecommit1Failed(%d): temp error: %+v", sector.SectorNumber, err) return nil, false @@ -74,14 +74,14 @@ func (m *Sealing) handleSealPrecommit2Failed(ctx statemachine.Context, sector Se } func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorInfo) error { - tok, height, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err) return nil } if sector.PreCommitMessage != nil { - mw, err := m.Api.StateSearchMsg(ctx.Context(), tok, *sector.PreCommitMessage, api.LookbackNoLimit, true) + mw, err := m.Api.StateSearchMsg(ctx.Context(), ts.Key(), *sector.PreCommitMessage, api.LookbackNoLimit, true) if err != nil { // API error if err := failedCooldown(ctx, sector); err != nil { @@ -108,7 +108,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI } } - if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.Api); err != nil { + if err := checkPrecommit(ctx.Context(), m.Address(), sector, ts.Key(), ts.Height(), m.Api); err != nil { switch err.(type) { case *ErrApi: log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err) @@ -141,7 +141,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI if pci, is := m.checkPreCommitted(ctx, sector); is && pci != nil { if sector.PreCommitMessage == nil { log.Warnf("sector %d is precommitted on chain, but we don't have precommit message", sector.SectorNumber) - return ctx.Send(SectorPreCommitLanded{TipSet: tok}) + return ctx.Send(SectorPreCommitLanded{TipSet: ts.Key()}) } if pci.Info.SealedCID != *sector.CommR { @@ -210,13 +210,13 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect } } - tok, _, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleSubmitReplicaUpdateFailed: api error, not proceeding: %+v", err) return nil } - if err := checkReplicaUpdate(ctx.Context(), m.maddr, sector, tok, m.Api); err != nil { + if err := checkReplicaUpdate(ctx.Context(), m.maddr, sector, ts.Key(), m.Api); err != nil { switch err.(type) { case *ErrApi: log.Errorf("handleSubmitReplicaUpdateFailed: api error, not proceeding: %+v", err) @@ -239,7 +239,7 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect } // Abort upgrade for sectors that went faulty since being marked for upgrade - active, err := m.sectorActive(ctx.Context(), tok, sector.SectorNumber) + active, err := m.sectorActive(ctx.Context(), ts.Key(), sector.SectorNumber) if err != nil { log.Errorf("sector active check: api error, not proceeding: %+v", err) return nil @@ -263,14 +263,14 @@ func (m *Sealing) handleReleaseSectorKeyFailed(ctx statemachine.Context, sector } func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error { - tok, _, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleCommitting: api error, not proceeding: %+v", err) return nil } if sector.CommitMessage != nil { - mw, err := m.Api.StateSearchMsg(ctx.Context(), tok, *sector.CommitMessage, api.LookbackNoLimit, true) + mw, err := m.Api.StateSearchMsg(ctx.Context(), ts.Key(), *sector.CommitMessage, api.LookbackNoLimit, true) if err != nil { // API error if err := failedCooldown(ctx, sector); err != nil { @@ -297,7 +297,7 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo } } - if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil { + if err := m.checkCommit(ctx.Context(), sector, sector.Proof, ts.Key()); err != nil { switch err.(type) { case *ErrApi: log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err) @@ -437,7 +437,7 @@ func (m *Sealing) handleRecoverDealIDsOrFailWith(ctx statemachine.Context, secto if err != nil { return err } - tok, _, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { return err } @@ -459,7 +459,7 @@ func (m *Sealing) handleRecoverDealIDsOrFailWith(ctx statemachine.Context, secto mdp := *p.DealInfo.DealProposal dp = &mdp } - res, err := m.DealInfo.GetCurrentDealInfo(ctx.Context(), tok, dp, *p.DealInfo.PublishCid) + res, err := m.DealInfo.GetCurrentDealInfo(ctx.Context(), ts.Key(), dp, *p.DealInfo.PublishCid) if err != nil { failed[i] = xerrors.Errorf("getting current deal info for piece %d: %w", i, err) continue @@ -510,7 +510,7 @@ func (m *Sealing) handleSnapDealsRecoverDealIDs(ctx statemachine.Context, sector } func recoveryPiecesToFix(ctx context.Context, api SealingAPI, sector SectorInfo, maddr address.Address) ([]int, int, error) { - tok, height, err := api.ChainHead(ctx) + ts, err := api.ChainHead(ctx) if err != nil { return nil, 0, xerrors.Errorf("getting chain head: %w", err) } @@ -530,7 +530,7 @@ func recoveryPiecesToFix(ctx context.Context, api SealingAPI, sector SectorInfo, continue } - proposal, err := api.StateMarketStorageDealProposal(ctx, p.DealInfo.DealID, tok) + proposal, err := api.StateMarketStorageDealProposal(ctx, p.DealInfo.DealID, ts.Key()) if err != nil { log.Warnf("getting deal %d for piece %d: %+v", p.DealInfo.DealID, i, err) toFix = append(toFix, i) @@ -555,10 +555,10 @@ func recoveryPiecesToFix(ctx context.Context, api SealingAPI, sector SectorInfo, continue } - if height >= proposal.StartEpoch { + if ts.Height() >= proposal.StartEpoch { // TODO: check if we are in an early enough state (before precommit), try to remove the offending pieces // (tricky as we have to 'defragment' the sector while doing that, and update piece references for retrieval) - return nil, 0, xerrors.Errorf("can't fix sector deals: piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, height) + return nil, 0, xerrors.Errorf("can't fix sector deals: piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, ts.Height()) } } diff --git a/storage/pipeline/states_failed_test.go b/storage/pipeline/states_failed_test.go index 02846f93f..468f7fb3d 100644 --- a/storage/pipeline/states_failed_test.go +++ b/storage/pipeline/states_failed_test.go @@ -4,7 +4,6 @@ package sealing_test import ( "bytes" "context" - "github.com/filecoin-project/lotus/chain/types" "testing" "github.com/golang/mock/gomock" @@ -21,6 +20,7 @@ import ( api2 "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/types" pipeline "github.com/filecoin-project/lotus/storage/pipeline" "github.com/filecoin-project/lotus/storage/pipeline/mocks" ) diff --git a/storage/pipeline/states_proving.go b/storage/pipeline/states_proving.go index 7110193fa..f4b957249 100644 --- a/storage/pipeline/states_proving.go +++ b/storage/pipeline/states_proving.go @@ -1,7 +1,6 @@ package sealing import ( - "github.com/filecoin-project/lotus/api" "time" "golang.org/x/xerrors" @@ -9,6 +8,7 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/go-statemachine" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" @@ -98,21 +98,21 @@ func (m *Sealing) handleTerminateWait(ctx statemachine.Context, sector SectorInf func (m *Sealing) handleTerminateFinality(ctx statemachine.Context, sector SectorInfo) error { for { - tok, epoch, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting chain head: %w", err)}) } - nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok) + nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key()) if err != nil { return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting network version: %w", err)}) } - if epoch >= sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv) { + if ts.Height() >= sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv) { return ctx.Send(SectorRemove{}) } - toWait := time.Duration(epoch-sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv)) * time.Duration(build.BlockDelaySecs) * time.Second + toWait := time.Duration(ts.Height()-sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv)) * time.Duration(build.BlockDelaySecs) * time.Second select { case <-time.After(toWait): continue diff --git a/storage/pipeline/states_replica_update.go b/storage/pipeline/states_replica_update.go index 89e96ffe2..8ecf206d6 100644 --- a/storage/pipeline/states_replica_update.go +++ b/storage/pipeline/states_replica_update.go @@ -3,7 +3,6 @@ package sealing import ( "bytes" "context" - "github.com/filecoin-project/lotus/build" "time" "golang.org/x/xerrors" @@ -16,6 +15,7 @@ import ( "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" ) @@ -41,12 +41,12 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect return xerrors.Errorf("invalid sector %d with nil CommR", sector.SectorNumber) } // Abort upgrade for sectors that went faulty since being marked for upgrade - tok, _, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleProveReplicaUpdate: api error, not proceeding: %+v", err) return nil } - active, err := m.sectorActive(ctx.Context(), tok, sector.SectorNumber) + active, err := m.sectorActive(ctx.Context(), ts.Key(), sector.SectorNumber) if err != nil { log.Errorf("sector active check: api error, not proceeding: %+v", err) return nil @@ -77,17 +77,17 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector SectorInfo) error { - tok, _, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %+v", err) return nil } - if err := checkReplicaUpdate(ctx.Context(), m.maddr, sector, tok, m.Api); err != nil { + if err := checkReplicaUpdate(ctx.Context(), m.maddr, sector, ts.Key(), m.Api); err != nil { return ctx.Send(SectorSubmitReplicaUpdateFailed{}) } - sl, err := m.Api.StateSectorPartition(ctx.Context(), m.maddr, sector.SectorNumber, tok) + sl, err := m.Api.StateSectorPartition(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key()) if err != nil { log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %+v", err) return nil @@ -121,7 +121,7 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec return xerrors.Errorf("getting config: %w", err) } - onChainInfo, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + onChainInfo, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key()) if err != nil { log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %+v", err) return nil @@ -144,7 +144,7 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec //ReplaceSectorNumber: 0, } - collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, virtualPCI, tok) + collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, virtualPCI, ts.Key()) if err != nil { return xerrors.Errorf("getting initial pledge collateral: %w", err) } @@ -162,7 +162,7 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec goodFunds := big.Add(collateral, big.Int(m.feeCfg.MaxCommitGasFee)) - mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, tok) + mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, ts.Key()) if err != nil { log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %+v", err) return nil @@ -241,12 +241,12 @@ func (m *Sealing) handleUpdateActivating(ctx statemachine.Context, sector Sector return err } - tok, _, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { return err } - nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok) + nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key()) if err != nil { return err } diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index e65b50322..9840d40a8 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -3,7 +3,6 @@ package sealing import ( "bytes" "context" - "github.com/filecoin-project/lotus/build" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -21,6 +20,7 @@ import ( "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" @@ -120,7 +120,7 @@ func checkProveCommitExpired(preCommitEpoch, msd abi.ChainEpoch, currEpoch abi.C } func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, bool, error) { - tok, epoch, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("getTicket: api error, not proceeding: %+v", err) return nil, 0, false, nil @@ -134,13 +134,13 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se return nil, 0, false, nil } - ticketEpoch := epoch - policy.SealRandomnessLookback + ticketEpoch := ts.Height() - policy.SealRandomnessLookback buf := new(bytes.Buffer) if err := m.maddr.MarshalCBOR(buf); err != nil { return nil, 0, allocated, err } - pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key()) if err != nil { return nil, 0, allocated, xerrors.Errorf("getting precommit info: %w", err) } @@ -148,7 +148,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se if pci != nil { ticketEpoch = pci.Info.SealRandEpoch - nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok) + nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key()) if err != nil { return nil, 0, allocated, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err) } @@ -162,7 +162,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se return nil, 0, allocated, xerrors.Errorf("getTicket: max prove commit duration policy error, not proceeding: %w", err) } - if checkProveCommitExpired(pci.PreCommitEpoch, msd, epoch) { + if checkProveCommitExpired(pci.PreCommitEpoch, msd, ts.Height()) { return nil, 0, allocated, xerrors.Errorf("ticket expired for precommitted sector") } } @@ -171,7 +171,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se return nil, 0, allocated, xerrors.Errorf("sector %s precommitted but expired", sector.SectorNumber) } - rand, err := m.Api.StateGetRandomnessFromTickets(ctx.Context(), crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), tok) + rand, err := m.Api.StateGetRandomnessFromTickets(ctx.Context(), crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key()) if err != nil { return nil, 0, allocated, err } @@ -217,14 +217,14 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) } } - tok, height, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) return nil } - if checkTicketExpired(sector.TicketEpoch, height) { - pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + if checkTicketExpired(sector.TicketEpoch, ts.Height()) { + pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key()) if err != nil { log.Errorf("handlePreCommit1: StateSectorPreCommitInfo: api error, not proceeding: %+v", err) return nil @@ -234,7 +234,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorOldTicket{}) // go get new ticket } - nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok) + nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key()) if err != nil { log.Errorf("handlePreCommit1: StateNetworkVersion: api error, not proceeding: %+v", err) return nil @@ -252,7 +252,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) } // if height > PreCommitEpoch + msd, there is no need to recalculate - if checkProveCommitExpired(pci.PreCommitEpoch, msd, height) { + if checkProveCommitExpired(pci.PreCommitEpoch, msd, ts.Height()) { return ctx.Send(SectorOldTicket{}) // will be removed } } @@ -284,13 +284,13 @@ func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) } func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) (*miner.SectorPreCommitInfo, big.Int, types.TipSetKey, error) { - tok, height, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) return nil, big.Zero(), types.EmptyTSK, nil } - if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.Api); err != nil { + if err := checkPrecommit(ctx.Context(), m.Address(), sector, ts.Key(), ts.Height(), m.Api); err != nil { switch err := err.(type) { case *ErrApi: log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) @@ -307,7 +307,7 @@ func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) ( case *ErrExpiredDeals: return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)}) case *ErrPrecommitOnChain: - return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit + return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorPreCommitLanded{TipSet: ts.Key()}) // we re-did precommit case *ErrSectorNumberAllocated: log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err) // TODO: check if the sector is committed (not sure how we'd end up here) @@ -322,7 +322,7 @@ func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) ( return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("handlePreCommitting: failed to compute pre-commit expiry: %w", err)}) } - nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok) + nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key()) if err != nil { return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("failed to get network version: %w", err)}) } @@ -341,7 +341,7 @@ func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) ( } // Assume: both precommit msg & commit msg land on chain as early as possible - maxExpiration := height + policy.GetPreCommitChallengeDelay() + policy.GetMaxSectorExpirationExtension() + maxExpiration := ts.Height() + policy.GetPreCommitChallengeDelay() + policy.GetMaxSectorExpirationExtension() if expiration > maxExpiration { expiration = maxExpiration } @@ -356,12 +356,12 @@ func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) ( DealIDs: sector.dealIDs(), } - collateral, err := m.Api.StateMinerPreCommitDepositForPower(ctx.Context(), m.maddr, *params, tok) + collateral, err := m.Api.StateMinerPreCommitDepositForPower(ctx.Context(), m.maddr, *params, ts.Key()) if err != nil { return nil, big.Zero(), types.EmptyTSK, xerrors.Errorf("getting initial pledge collateral: %w", err) } - return params, collateral, tok, nil + return params, collateral, ts.Key(), nil } func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { @@ -482,13 +482,13 @@ func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInf } func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error { - tok, _, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleWaitSeed: api error, not proceeding: %+v", err) return nil } - pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key()) if err != nil { return xerrors.Errorf("getting precommit info: %w", err) } @@ -501,7 +501,7 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er err = m.events.ChainAt(func(ectx context.Context, _ types.TipSetKey, curH abi.ChainEpoch) error { // in case of null blocks the randomness can land after the tipset we // get from the events API - tok, _, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleCommitting: api error, not proceeding: %+v", err) return nil @@ -511,9 +511,9 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er if err := m.maddr.MarshalCBOR(buf); err != nil { return err } - rand, err := m.Api.StateGetRandomnessFromBeacon(ectx, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, randHeight, buf.Bytes(), tok) + rand, err := m.Api.StateGetRandomnessFromBeacon(ectx, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, randHeight, buf.Bytes(), ts.Key()) if err != nil { - err = xerrors.Errorf("failed to get randomness for computing seal proof (ch %d; rh %d; tsk %x): %w", curH, randHeight, tok, err) + err = xerrors.Errorf("failed to get randomness for computing seal proof (ch %d; rh %d; tsk %x): %w", curH, randHeight, ts.Key(), err) _ = ctx.Send(SectorChainPreCommitFailed{error: err}) return err @@ -577,13 +577,13 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) } { - tok, _, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleCommitting: api error, not proceeding: %+v", err) return nil } - if err := m.checkCommit(ctx.Context(), sector, proof, tok); err != nil { + if err := m.checkCommit(ctx.Context(), sector, proof, ts.Key()); err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)}) } } @@ -616,13 +616,13 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo } } - tok, _, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err) return nil } - if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil { + if err := m.checkCommit(ctx.Context(), sector, sector.Proof, ts.Key()); err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)}) } @@ -636,13 +636,13 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)}) } - mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, tok) + mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, ts.Key()) if err != nil { log.Errorf("handleCommitting: api error, not proceeding: %+v", err) return nil } - pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok) + pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key()) if err != nil { return xerrors.Errorf("getting precommit info: %w", err) } @@ -650,7 +650,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorCommitFailed{error: xerrors.Errorf("precommit info not found on chain")}) } - collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, pci.Info, tok) + collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, pci.Info, ts.Key()) if err != nil { return xerrors.Errorf("getting initial pledge collateral: %w", err) } @@ -701,13 +701,13 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S }) if err != nil || res.Error != "" { - tok, _, err := m.Api.ChainHead(ctx.Context()) + ts, err := m.Api.ChainHead(ctx.Context()) if err != nil { log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err) return nil } - if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil { + if err := m.checkCommit(ctx.Context(), sector, sector.Proof, ts.Key()); err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)}) } diff --git a/storage/pipeline/upgrade_queue.go b/storage/pipeline/upgrade_queue.go index ddbabb9f2..d3c2d4184 100644 --- a/storage/pipeline/upgrade_queue.go +++ b/storage/pipeline/upgrade_queue.go @@ -25,16 +25,16 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e return xerrors.Errorf("not a committed-capacity sector, has deals") } - tok, head, err := m.Api.ChainHead(ctx) + ts, err := m.Api.ChainHead(ctx) if err != nil { return xerrors.Errorf("couldnt get chain head: %w", err) } - onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, id, tok) + onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, id, ts.Key()) if err != nil { return xerrors.Errorf("failed to read sector on chain info: %w", err) } - active, err := m.sectorActive(ctx, tok, id) + active, err := m.sectorActive(ctx, ts.Key(), id) if err != nil { return xerrors.Errorf("failed to check if sector is active") } @@ -42,7 +42,7 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e return xerrors.Errorf("cannot mark inactive sector for upgrade") } - if onChainInfo.Expiration-head < market7.DealMinDuration { + if onChainInfo.Expiration-ts.Height() < market7.DealMinDuration { return xerrors.Errorf("pointless to upgrade sector %d, expiration %d is less than a min deal duration away from current epoch."+ "Upgrade expiration before marking for upgrade", id, onChainInfo.Expiration) }