sealing pipeline: Use non-special ChainHead

This commit is contained in:
Łukasz Magiera 2022-06-16 13:15:49 +02:00
parent 048bfe6d5b
commit a084445e6c
21 changed files with 174 additions and 149 deletions

View File

@ -41,9 +41,9 @@ type SectorCommittedManager struct {
dpc diffPreCommitsAPI
}
func NewSectorCommittedManager(ev eventsCalledAPI, tskAPI pipeline.CurrentDealInfoTskAPI, dpcAPI diffPreCommitsAPI) *SectorCommittedManager {
func NewSectorCommittedManager(ev eventsCalledAPI, tskAPI pipeline.CurrentDealInfoAPI, dpcAPI diffPreCommitsAPI) *SectorCommittedManager {
dim := &pipeline.CurrentDealInfoManager{
CDAPI: &pipeline.CurrentDealInfoAPIAdapter{CurrentDealInfoTskAPI: tskAPI},
CDAPI: tskAPI,
}
return newSectorCommittedManager(ev, dim, dpcAPI)
}

View File

@ -111,6 +111,10 @@ func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, from types.TipSet
return s.delegate.StateSearchMsg(ctx, from, msg, limit, allowReplaced)
}
func (s SealingAPIAdapter) ChainHead(ctx context.Context) (*types.TipSet, error) {
return s.delegate.ChainHead(ctx)
}
func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tsk types.TipSetKey) (cid.Cid, error) {
nv, err := s.delegate.StateNetworkVersion(ctx, tsk)
@ -233,15 +237,6 @@ func (s SealingAPIAdapter) StateMarketStorageDealProposal(ctx context.Context, d
return deal.Proposal, nil
}
func (s SealingAPIAdapter) ChainHead(ctx context.Context) (types.TipSetKey, abi.ChainEpoch, error) {
head, err := s.delegate.ChainHead(ctx)
if err != nil {
return types.EmptyTSK, 0, err
}
return head.Key(), head.Height(), nil
}
func (s SealingAPIAdapter) ChainBaseFee(ctx context.Context, tsk types.TipSetKey) (abi.TokenAmount, error) {
ts, err := s.delegate.ChainGetTipSet(ctx, tsk)
if err != nil {

View File

@ -40,7 +40,7 @@ type ErrBadRU struct{ error }
type ErrBadPR struct{ error }
func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI, mustHaveDeals bool) error {
tok, height, err := api.ChainHead(ctx)
ts, err := api.ChainHead(ctx)
if err != nil {
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
}
@ -60,7 +60,7 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api
dealCount++
proposal, err := api.StateMarketStorageDealProposal(ctx, p.DealInfo.DealID, tok)
proposal, err := api.StateMarketStorageDealProposal(ctx, p.DealInfo.DealID, ts.Key())
if err != nil {
return &ErrInvalidDeals{xerrors.Errorf("getting deal %d for piece %d: %w", p.DealInfo.DealID, i, err)}
}
@ -77,8 +77,8 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api
return &ErrInvalidDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers deal %d with different size: %d != %d", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, p.Piece.Size, proposal.PieceSize)}
}
if height >= proposal.StartEpoch {
return &ErrExpiredDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, height)}
if ts.Height() >= proposal.StartEpoch {
return &ErrExpiredDeals{xerrors.Errorf("piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(si.Pieces), si.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, ts.Height())}
}
}

View File

@ -37,7 +37,7 @@ var aggFeeDen = big.NewInt(100)
type CommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
ChainHead(ctx context.Context) (types.TipSetKey, abi.ChainEpoch, error)
ChainHead(ctx context.Context) (*types.TipSet, error)
ChainBaseFee(context.Context, types.TipSetKey) (abi.TokenAmount, error)
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error)
@ -204,14 +204,14 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
var res []sealiface.CommitBatchRes
tok, h, err := b.api.ChainHead(b.mctx)
ts, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}
blackedOut := func() bool {
const nv16BlackoutWindow = abi.ChainEpoch(20) // a magik number
if h <= build.UpgradeSkyrHeight && build.UpgradeSkyrHeight-h < nv16BlackoutWindow {
if ts.Height() <= build.UpgradeSkyrHeight && build.UpgradeSkyrHeight-ts.Height() < nv16BlackoutWindow {
return true
}
return false
@ -221,7 +221,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
if !individual && !cfg.AggregateAboveBaseFee.Equals(big.Zero()) {
bf, err := b.api.ChainBaseFee(b.mctx, tok)
bf, err := b.api.ChainBaseFee(b.mctx, ts.Key())
if err != nil {
return nil, xerrors.Errorf("couldn't get base fee: %w", err)
}
@ -265,7 +265,7 @@ func (b *CommitBatcher) maybeStartBatch(notif bool) ([]sealiface.CommitBatchRes,
}
func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBatchRes, error) {
tok, _, err := b.api.ChainHead(b.mctx)
ts, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}
@ -292,7 +292,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
res.Sectors = append(res.Sectors, id)
sc, err := b.getSectorCollateral(id, tok)
sc, err := b.getSectorCollateral(id, ts.Key())
if err != nil {
res.FailedSectors[id] = err.Error()
continue
@ -321,7 +321,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting miner id: %w", err)
}
nv, err := b.api.StateNetworkVersion(b.mctx, tok)
nv, err := b.api.StateNetworkVersion(b.mctx, ts.Key())
if err != nil {
log.Errorf("getting network version: %s", err)
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("getting network version: %s", err)
@ -354,7 +354,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
maxFee := b.feeCfg.MaxCommitBatchGasFee.FeeForSectors(len(infos))
bf, err := b.api.ChainBaseFee(b.mctx, tok)
bf, err := b.api.ChainBaseFee(b.mctx, ts.Key())
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("couldn't get base fee: %w", err)
}
@ -412,7 +412,7 @@ func (b *CommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.C
}
}
tok, _, err := b.api.ChainHead(b.mctx)
ts, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}
@ -425,7 +425,7 @@ func (b *CommitBatcher) processIndividually(cfg sealiface.Config) ([]sealiface.C
FailedSectors: map[abi.SectorNumber]string{},
}
mcid, err := b.processSingle(cfg, mi, &avail, sn, info, tok)
mcid, err := b.processSingle(cfg, mi, &avail, sn, info, ts.Key())
if err != nil {
log.Errorf("process single error: %+v", err) // todo: return to user
r.FailedSectors[sn] = err.Error()
@ -569,18 +569,18 @@ func (b *CommitBatcher) Stop(ctx context.Context) error {
// TODO: If this returned epochs, it would make testing much easier
func (b *CommitBatcher) getCommitCutoff(si SectorInfo) (time.Time, error) {
tok, curEpoch, err := b.api.ChainHead(b.mctx)
ts, err := b.api.ChainHead(b.mctx)
if err != nil {
return time.Now(), xerrors.Errorf("getting chain head: %s", err)
}
nv, err := b.api.StateNetworkVersion(b.mctx, tok)
nv, err := b.api.StateNetworkVersion(b.mctx, ts.Key())
if err != nil {
log.Errorf("getting network version: %s", err)
return time.Now(), xerrors.Errorf("getting network version: %s", err)
}
pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, si.SectorNumber, tok)
pci, err := b.api.StateSectorPreCommitInfo(b.mctx, b.maddr, si.SectorNumber, ts.Key())
if err != nil {
log.Errorf("getting precommit info: %s", err)
return time.Now(), err
@ -609,11 +609,11 @@ func (b *CommitBatcher) getCommitCutoff(si SectorInfo) (time.Time, error) {
}
}
if cutoffEpoch <= curEpoch {
if cutoffEpoch <= ts.Height() {
return time.Now(), nil
}
return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second), nil
return time.Now().Add(time.Duration(cutoffEpoch-ts.Height()) * time.Duration(build.BlockDelaySecs) * time.Second), nil
}
func (b *CommitBatcher) getSectorCollateral(sn abi.SectorNumber, tok types.TipSetKey) (abi.TokenAmount, error) {

View File

@ -10,12 +10,14 @@ import (
"time"
"github.com/golang/mock/gomock"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
minertypes "github.com/filecoin-project/go-state-types/builtin/v8/miner"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/network"
prooftypes "github.com/filecoin-project/go-state-types/proof"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
@ -105,7 +107,7 @@ func TestCommitBatcher(t *testing.T) {
SectorNumber: sn,
}
s.EXPECT().ChainHead(gomock.Any()).Return(types.EmptyTSK, abi.ChainEpoch(1), nil)
s.EXPECT().ChainHead(gomock.Any()).Return(makeTs(t, 1), nil)
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&minertypes.SectorPreCommitOnChainInfo{
PreCommitDeposit: big.Zero(),
@ -166,7 +168,7 @@ func TestCommitBatcher(t *testing.T) {
basefee = types.NanoFil
}
s.EXPECT().ChainHead(gomock.Any()).Return(types.EmptyTSK, abi.ChainEpoch(1), nil)
s.EXPECT().ChainHead(gomock.Any()).Return(makeTs(t, 1), nil)
if batch {
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
}
@ -176,7 +178,7 @@ func TestCommitBatcher(t *testing.T) {
ti = len(expect)
}
s.EXPECT().ChainHead(gomock.Any()).Return(types.EmptyTSK, abi.ChainEpoch(1), nil)
s.EXPECT().ChainHead(gomock.Any()).Return(makeTs(t, 1), nil)
pciC := len(expect)
if failOnePCI {
@ -385,3 +387,31 @@ func (f fakeProver) AggregateSealProofs(aggregateInfo prooftypes.AggregateSealVe
}
var _ ffiwrapper.Prover = &fakeProver{}
func makeTs(t *testing.T, h abi.ChainEpoch) *types.TipSet {
a, _ := address.NewFromString("t00")
dummyCid, _ := cid.Parse("bafkqaaa")
var ts, err = types.NewTipSet([]*types.BlockHeader{
{
Height: h,
Miner: a,
Parents: []cid.Cid{},
Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}},
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
},
})
if t != nil {
require.NoError(t, err)
}
return ts
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

View File

@ -124,7 +124,7 @@ func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo,
sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay)
// check deal age, start sealing when the deal closest to starting is within slack time
_, current, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
blockTime := time.Second * time.Duration(build.BlockDelaySecs)
if err != nil {
return false, xerrors.Errorf("API error getting head: %w", err)
@ -134,7 +134,7 @@ func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo,
continue
}
dealSafeSealEpoch := piece.DealInfo.DealProposal.StartEpoch - cfg.StartEpochSealingBuffer
dealSafeSealTime := time.Now().Add(time.Duration(dealSafeSealEpoch-current) * blockTime)
dealSafeSealTime := time.Now().Add(time.Duration(dealSafeSealEpoch-ts.Height()) * blockTime)
if dealSafeSealTime.Before(sealTime) {
sealTime = dealSafeSealTime
}
@ -303,14 +303,14 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
return api.SectorOffset{}, xerrors.Errorf("getting config: %w", err)
}
_, head, err := m.Api.ChainHead(ctx)
ts, err := m.Api.ChainHead(ctx)
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("couldnt get chain head: %w", err)
}
if head+cfg.StartEpochSealingBuffer > deal.DealProposal.StartEpoch {
if ts.Height()+cfg.StartEpochSealingBuffer > deal.DealProposal.StartEpoch {
return api.SectorOffset{}, xerrors.Errorf(
"cannot add piece for deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d",
deal.DealProposal.PieceCID, head, deal.DealProposal.StartEpoch)
deal.DealProposal.PieceCID, ts.Height(), deal.DealProposal.StartEpoch)
}
m.inputLk.Lock()
@ -544,14 +544,14 @@ func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize
}
}
_, curEpoch, err := m.Api.ChainHead(ctx)
ts, err := m.Api.ChainHead(ctx)
if err != nil {
return 0, 0, xerrors.Errorf("getting current epoch: %w", err)
}
minDur, maxDur := policy.DealDurationBounds(0)
return curEpoch + minDur, curEpoch + maxDur, nil
return ts.Height() + minDur, ts.Height() + maxDur, nil
}
func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {

View File

@ -8,6 +8,9 @@ import (
context "context"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid"
address "github.com/filecoin-project/go-address"
abi "github.com/filecoin-project/go-state-types/abi"
big "github.com/filecoin-project/go-state-types/big"
@ -16,11 +19,10 @@ import (
crypto "github.com/filecoin-project/go-state-types/crypto"
dline "github.com/filecoin-project/go-state-types/dline"
network "github.com/filecoin-project/go-state-types/network"
api "github.com/filecoin-project/lotus/api"
types "github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid"
)
// MockSealingAPI is a mock of SealingAPI interface.
@ -77,13 +79,12 @@ func (mr *MockSealingAPIMockRecorder) ChainGetMessage(arg0, arg1 interface{}) *g
}
// ChainHead mocks base method.
func (m *MockSealingAPI) ChainHead(arg0 context.Context) (types.TipSetKey, abi.ChainEpoch, error) {
func (m *MockSealingAPI) ChainHead(arg0 context.Context) (*types.TipSet, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainHead", arg0)
ret0, _ := ret[0].(types.TipSetKey)
ret1, _ := ret[1].(abi.ChainEpoch)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
ret0, _ := ret[0].(*types.TipSet)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ChainHead indicates an expected call of ChainHead.

View File

@ -60,13 +60,12 @@ func (mr *MockCommitBatcherApiMockRecorder) ChainBaseFee(arg0, arg1 interface{})
}
// ChainHead mocks base method.
func (m *MockCommitBatcherApi) ChainHead(arg0 context.Context) (types.TipSetKey, abi.ChainEpoch, error) {
func (m *MockCommitBatcherApi) ChainHead(arg0 context.Context) (*types.TipSet, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainHead", arg0)
ret0, _ := ret[0].(types.TipSetKey)
ret1, _ := ret[1].(abi.ChainEpoch)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
ret0, _ := ret[0].(*types.TipSet)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ChainHead indicates an expected call of ChainHead.

View File

@ -59,13 +59,12 @@ func (mr *MockPreCommitBatcherApiMockRecorder) ChainBaseFee(arg0, arg1 interface
}
// ChainHead mocks base method.
func (m *MockPreCommitBatcherApi) ChainHead(arg0 context.Context) (types.TipSetKey, abi.ChainEpoch, error) {
func (m *MockPreCommitBatcherApi) ChainHead(arg0 context.Context) (*types.TipSet, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainHead", arg0)
ret0, _ := ret[0].(types.TipSetKey)
ret1, _ := ret[1].(abi.ChainEpoch)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
ret0, _ := ret[0].(*types.TipSet)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ChainHead indicates an expected call of ChainHead.

View File

@ -31,7 +31,7 @@ type PreCommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error)
ChainHead(ctx context.Context) (types.TipSetKey, abi.ChainEpoch, error)
ChainHead(ctx context.Context) (*types.TipSet, error)
ChainBaseFee(context.Context, types.TipSetKey) (abi.TokenAmount, error)
StateNetworkVersion(ctx context.Context, tok types.TipSetKey) (network.Version, error)
}
@ -188,18 +188,18 @@ func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBat
return nil, nil
}
tok, _, err := b.api.ChainHead(b.mctx)
ts, err := b.api.ChainHead(b.mctx)
if err != nil {
return nil, err
}
bf, err := b.api.ChainBaseFee(b.mctx, tok)
bf, err := b.api.ChainBaseFee(b.mctx, ts.Key())
if err != nil {
return nil, xerrors.Errorf("couldn't get base fee: %w", err)
}
// TODO: Drop this once nv14 has come and gone
nv, err := b.api.StateNetworkVersion(b.mctx, tok)
nv, err := b.api.StateNetworkVersion(b.mctx, ts.Key())
if err != nil {
return nil, xerrors.Errorf("couldn't get network version: %w", err)
}
@ -212,7 +212,7 @@ func (b *PreCommitBatcher) maybeStartBatch(notif bool) ([]sealiface.PreCommitBat
// todo support multiple batches
var res []sealiface.PreCommitBatchRes
if !individual {
res, err = b.processBatch(cfg, tok, bf, nv)
res, err = b.processBatch(cfg, ts.Key(), bf, nv)
} else {
res, err = b.processIndividually(cfg)
}
@ -378,13 +378,13 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tok types.TipSetKe
// register PreCommit, wait for batch message, return message CID
func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, deposit abi.TokenAmount, in *miner.SectorPreCommitInfo) (res sealiface.PreCommitBatchRes, err error) {
_, curEpoch, err := b.api.ChainHead(b.mctx)
ts, err := b.api.ChainHead(b.mctx)
if err != nil {
log.Errorf("getting chain head: %s", err)
return sealiface.PreCommitBatchRes{}, err
}
cutoff, err := getPreCommitCutoff(curEpoch, s)
cutoff, err := getPreCommitCutoff(ts.Height(), s)
if err != nil {
return sealiface.PreCommitBatchRes{}, xerrors.Errorf("failed to calculate cutoff: %w", err)
}

View File

@ -109,7 +109,7 @@ func TestPrecommitBatcher(t *testing.T) {
SectorNumber: sn,
}
s.EXPECT().ChainHead(gomock.Any()).Return(types.EmptyTSK, abi.ChainEpoch(1), nil)
s.EXPECT().ChainHead(gomock.Any()).Return(makeTs(t, 1), nil)
go func() {
defer done.Unlock()
@ -153,7 +153,7 @@ func TestPrecommitBatcher(t *testing.T) {
//stm: @CHAIN_STATE_MINER_INFO_001, @CHAIN_STATE_NETWORK_VERSION_001
expectSend := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise {
s.EXPECT().ChainHead(gomock.Any()).Return(types.EmptyTSK, abi.ChainEpoch(1), nil)
s.EXPECT().ChainHead(gomock.Any()).Return(makeTs(t, 1), nil)
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(10001), nil)
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version14, nil)
@ -174,7 +174,7 @@ func TestPrecommitBatcher(t *testing.T) {
//stm: @CHAIN_STATE_MINER_INFO_001, @CHAIN_STATE_NETWORK_VERSION_001
expectSendsSingle := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *pipeline.PreCommitBatcher) promise {
s.EXPECT().ChainHead(gomock.Any()).Return(types.EmptyTSK, abi.ChainEpoch(1), nil)
s.EXPECT().ChainHead(gomock.Any()).Return(makeTs(t, 1), nil)
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(9999), nil)
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version14, nil)

View File

@ -19,7 +19,7 @@ type PreCommitPolicy interface {
}
type Chain interface {
ChainHead(ctx context.Context) (types.TipSetKey, abi.ChainEpoch, error)
ChainHead(context.Context) (*types.TipSet, error)
StateNetworkVersion(ctx context.Context, tok types.TipSetKey) (network.Version, error)
}
@ -59,7 +59,7 @@ func NewBasicPreCommitPolicy(api Chain, cfgGetter GetSealingConfigFunc, provingB
// Expiration produces the pre-commit sector expiration epoch for an encoded
// replica containing the provided enumeration of pieces and deals.
func (p *BasicPreCommitPolicy) Expiration(ctx context.Context, ps ...Piece) (abi.ChainEpoch, error) {
_, epoch, err := p.api.ChainHead(ctx)
ts, err := p.api.ChainHead(ctx)
if err != nil {
return 0, err
}
@ -71,8 +71,8 @@ func (p *BasicPreCommitPolicy) Expiration(ctx context.Context, ps ...Piece) (abi
continue
}
if p.DealInfo.DealSchedule.EndEpoch < epoch {
log.Warnf("piece schedule %+v ended before current epoch %d", p, epoch)
if p.DealInfo.DealSchedule.EndEpoch < ts.Height() {
log.Warnf("piece schedule %+v ended before current epoch %d", p, ts.Height())
continue
}
@ -89,13 +89,13 @@ func (p *BasicPreCommitPolicy) Expiration(ctx context.Context, ps ...Piece) (abi
return 0, err
}
tmp := epoch + expirationDuration
tmp := ts.Height() + expirationDuration
end = &tmp
}
// Ensure there is at least one day for the PC message to land without falling below min sector lifetime
// TODO: The "one day" should probably be a config, though it doesn't matter too much
minExp := epoch + policy.GetMinSectorExpiration() + miner.WPoStProvingPeriod
minExp := ts.Height() + policy.GetMinSectorExpiration() + miner.WPoStProvingPeriod
if *end < minExp {
end = &minExp
}

View File

@ -46,8 +46,8 @@ func (f *fakeChain) StateNetworkVersion(ctx context.Context, tok types.TipSetKey
return build.NewestNetworkVersion, nil
}
func (f *fakeChain) ChainHead(ctx context.Context) (types.TipSetKey, abi.ChainEpoch, error) {
return types.NewTipSetKey(), f.h, nil
func (f *fakeChain) ChainHead(ctx context.Context) (*types.TipSet, error) {
return makeTs(nil, f.h), nil
}
func fakePieceCid(t *testing.T) cid.Cid {

View File

@ -69,7 +69,7 @@ type SealingAPI interface {
StateMinerDeadlines(context.Context, address.Address, types.TipSetKey) ([]api.Deadline, error)
StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tok types.TipSetKey) ([]api.Partition, error)
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
ChainHead(ctx context.Context) (types.TipSetKey, abi.ChainEpoch, error)
ChainHead(ctx context.Context) (*types.TipSet, error)
ChainBaseFee(context.Context, types.TipSetKey) (abi.TokenAmount, error)
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tok types.TipSetKey) (abi.Randomness, error)

View File

@ -2,7 +2,6 @@ package sealing
import (
"context"
"github.com/filecoin-project/lotus/api"
"time"
"github.com/hashicorp/go-multierror"
@ -15,6 +14,7 @@ import (
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
)
@ -38,13 +38,13 @@ func failedCooldown(ctx statemachine.Context, sector SectorInfo) error {
}
func (m *Sealing) checkPreCommitted(ctx statemachine.Context, sector SectorInfo) (*miner.SectorPreCommitOnChainInfo, bool) {
tok, _, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleSealPrecommit1Failed(%d): temp error: %+v", sector.SectorNumber, err)
return nil, false
}
info, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
info, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
if err != nil {
log.Errorf("handleSealPrecommit1Failed(%d): temp error: %+v", sector.SectorNumber, err)
return nil, false
@ -74,14 +74,14 @@ func (m *Sealing) handleSealPrecommit2Failed(ctx statemachine.Context, sector Se
}
func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
tok, height, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
return nil
}
if sector.PreCommitMessage != nil {
mw, err := m.Api.StateSearchMsg(ctx.Context(), tok, *sector.PreCommitMessage, api.LookbackNoLimit, true)
mw, err := m.Api.StateSearchMsg(ctx.Context(), ts.Key(), *sector.PreCommitMessage, api.LookbackNoLimit, true)
if err != nil {
// API error
if err := failedCooldown(ctx, sector); err != nil {
@ -108,7 +108,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
}
}
if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.Api); err != nil {
if err := checkPrecommit(ctx.Context(), m.Address(), sector, ts.Key(), ts.Height(), m.Api); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
@ -141,7 +141,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
if pci, is := m.checkPreCommitted(ctx, sector); is && pci != nil {
if sector.PreCommitMessage == nil {
log.Warnf("sector %d is precommitted on chain, but we don't have precommit message", sector.SectorNumber)
return ctx.Send(SectorPreCommitLanded{TipSet: tok})
return ctx.Send(SectorPreCommitLanded{TipSet: ts.Key()})
}
if pci.Info.SealedCID != *sector.CommR {
@ -210,13 +210,13 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect
}
}
tok, _, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleSubmitReplicaUpdateFailed: api error, not proceeding: %+v", err)
return nil
}
if err := checkReplicaUpdate(ctx.Context(), m.maddr, sector, tok, m.Api); err != nil {
if err := checkReplicaUpdate(ctx.Context(), m.maddr, sector, ts.Key(), m.Api); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handleSubmitReplicaUpdateFailed: api error, not proceeding: %+v", err)
@ -239,7 +239,7 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect
}
// Abort upgrade for sectors that went faulty since being marked for upgrade
active, err := m.sectorActive(ctx.Context(), tok, sector.SectorNumber)
active, err := m.sectorActive(ctx.Context(), ts.Key(), sector.SectorNumber)
if err != nil {
log.Errorf("sector active check: api error, not proceeding: %+v", err)
return nil
@ -263,14 +263,14 @@ func (m *Sealing) handleReleaseSectorKeyFailed(ctx statemachine.Context, sector
}
func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
tok, _, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}
if sector.CommitMessage != nil {
mw, err := m.Api.StateSearchMsg(ctx.Context(), tok, *sector.CommitMessage, api.LookbackNoLimit, true)
mw, err := m.Api.StateSearchMsg(ctx.Context(), ts.Key(), *sector.CommitMessage, api.LookbackNoLimit, true)
if err != nil {
// API error
if err := failedCooldown(ctx, sector); err != nil {
@ -297,7 +297,7 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
}
}
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, ts.Key()); err != nil {
switch err.(type) {
case *ErrApi:
log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err)
@ -437,7 +437,7 @@ func (m *Sealing) handleRecoverDealIDsOrFailWith(ctx statemachine.Context, secto
if err != nil {
return err
}
tok, _, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
return err
}
@ -459,7 +459,7 @@ func (m *Sealing) handleRecoverDealIDsOrFailWith(ctx statemachine.Context, secto
mdp := *p.DealInfo.DealProposal
dp = &mdp
}
res, err := m.DealInfo.GetCurrentDealInfo(ctx.Context(), tok, dp, *p.DealInfo.PublishCid)
res, err := m.DealInfo.GetCurrentDealInfo(ctx.Context(), ts.Key(), dp, *p.DealInfo.PublishCid)
if err != nil {
failed[i] = xerrors.Errorf("getting current deal info for piece %d: %w", i, err)
continue
@ -510,7 +510,7 @@ func (m *Sealing) handleSnapDealsRecoverDealIDs(ctx statemachine.Context, sector
}
func recoveryPiecesToFix(ctx context.Context, api SealingAPI, sector SectorInfo, maddr address.Address) ([]int, int, error) {
tok, height, err := api.ChainHead(ctx)
ts, err := api.ChainHead(ctx)
if err != nil {
return nil, 0, xerrors.Errorf("getting chain head: %w", err)
}
@ -530,7 +530,7 @@ func recoveryPiecesToFix(ctx context.Context, api SealingAPI, sector SectorInfo,
continue
}
proposal, err := api.StateMarketStorageDealProposal(ctx, p.DealInfo.DealID, tok)
proposal, err := api.StateMarketStorageDealProposal(ctx, p.DealInfo.DealID, ts.Key())
if err != nil {
log.Warnf("getting deal %d for piece %d: %+v", p.DealInfo.DealID, i, err)
toFix = append(toFix, i)
@ -555,10 +555,10 @@ func recoveryPiecesToFix(ctx context.Context, api SealingAPI, sector SectorInfo,
continue
}
if height >= proposal.StartEpoch {
if ts.Height() >= proposal.StartEpoch {
// TODO: check if we are in an early enough state (before precommit), try to remove the offending pieces
// (tricky as we have to 'defragment' the sector while doing that, and update piece references for retrieval)
return nil, 0, xerrors.Errorf("can't fix sector deals: piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, height)
return nil, 0, xerrors.Errorf("can't fix sector deals: piece %d (of %d) of sector %d refers expired deal %d - should start at %d, head %d", i, len(sector.Pieces), sector.SectorNumber, p.DealInfo.DealID, proposal.StartEpoch, ts.Height())
}
}

View File

@ -4,7 +4,6 @@ package sealing_test
import (
"bytes"
"context"
"github.com/filecoin-project/lotus/chain/types"
"testing"
"github.com/golang/mock/gomock"
@ -21,6 +20,7 @@ import (
api2 "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
pipeline "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/pipeline/mocks"
)

View File

@ -1,7 +1,6 @@
package sealing
import (
"github.com/filecoin-project/lotus/api"
"time"
"golang.org/x/xerrors"
@ -9,6 +8,7 @@ import (
"github.com/filecoin-project/go-state-types/exitcode"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
@ -98,21 +98,21 @@ func (m *Sealing) handleTerminateWait(ctx statemachine.Context, sector SectorInf
func (m *Sealing) handleTerminateFinality(ctx statemachine.Context, sector SectorInfo) error {
for {
tok, epoch, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting chain head: %w", err)})
}
nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok)
nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key())
if err != nil {
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting network version: %w", err)})
}
if epoch >= sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv) {
if ts.Height() >= sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv) {
return ctx.Send(SectorRemove{})
}
toWait := time.Duration(epoch-sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv)) * time.Duration(build.BlockDelaySecs) * time.Second
toWait := time.Duration(ts.Height()-sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv)) * time.Duration(build.BlockDelaySecs) * time.Second
select {
case <-time.After(toWait):
continue

View File

@ -3,7 +3,6 @@ package sealing
import (
"bytes"
"context"
"github.com/filecoin-project/lotus/build"
"time"
"golang.org/x/xerrors"
@ -16,6 +15,7 @@ import (
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
)
@ -41,12 +41,12 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect
return xerrors.Errorf("invalid sector %d with nil CommR", sector.SectorNumber)
}
// Abort upgrade for sectors that went faulty since being marked for upgrade
tok, _, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleProveReplicaUpdate: api error, not proceeding: %+v", err)
return nil
}
active, err := m.sectorActive(ctx.Context(), tok, sector.SectorNumber)
active, err := m.sectorActive(ctx.Context(), ts.Key(), sector.SectorNumber)
if err != nil {
log.Errorf("sector active check: api error, not proceeding: %+v", err)
return nil
@ -77,17 +77,17 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect
func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector SectorInfo) error {
tok, _, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %+v", err)
return nil
}
if err := checkReplicaUpdate(ctx.Context(), m.maddr, sector, tok, m.Api); err != nil {
if err := checkReplicaUpdate(ctx.Context(), m.maddr, sector, ts.Key(), m.Api); err != nil {
return ctx.Send(SectorSubmitReplicaUpdateFailed{})
}
sl, err := m.Api.StateSectorPartition(ctx.Context(), m.maddr, sector.SectorNumber, tok)
sl, err := m.Api.StateSectorPartition(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
if err != nil {
log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %+v", err)
return nil
@ -121,7 +121,7 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec
return xerrors.Errorf("getting config: %w", err)
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
onChainInfo, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
if err != nil {
log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %+v", err)
return nil
@ -144,7 +144,7 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec
//ReplaceSectorNumber: 0,
}
collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, virtualPCI, tok)
collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, virtualPCI, ts.Key())
if err != nil {
return xerrors.Errorf("getting initial pledge collateral: %w", err)
}
@ -162,7 +162,7 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec
goodFunds := big.Add(collateral, big.Int(m.feeCfg.MaxCommitGasFee))
mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, tok)
mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, ts.Key())
if err != nil {
log.Errorf("handleSubmitReplicaUpdate: api error, not proceeding: %+v", err)
return nil
@ -241,12 +241,12 @@ func (m *Sealing) handleUpdateActivating(ctx statemachine.Context, sector Sector
return err
}
tok, _, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
return err
}
nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok)
nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key())
if err != nil {
return err
}

View File

@ -3,7 +3,6 @@ package sealing
import (
"bytes"
"context"
"github.com/filecoin-project/lotus/build"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
@ -21,6 +20,7 @@ import (
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
@ -120,7 +120,7 @@ func checkProveCommitExpired(preCommitEpoch, msd abi.ChainEpoch, currEpoch abi.C
}
func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, bool, error) {
tok, epoch, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("getTicket: api error, not proceeding: %+v", err)
return nil, 0, false, nil
@ -134,13 +134,13 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se
return nil, 0, false, nil
}
ticketEpoch := epoch - policy.SealRandomnessLookback
ticketEpoch := ts.Height() - policy.SealRandomnessLookback
buf := new(bytes.Buffer)
if err := m.maddr.MarshalCBOR(buf); err != nil {
return nil, 0, allocated, err
}
pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
if err != nil {
return nil, 0, allocated, xerrors.Errorf("getting precommit info: %w", err)
}
@ -148,7 +148,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se
if pci != nil {
ticketEpoch = pci.Info.SealRandEpoch
nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok)
nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key())
if err != nil {
return nil, 0, allocated, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err)
}
@ -162,7 +162,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se
return nil, 0, allocated, xerrors.Errorf("getTicket: max prove commit duration policy error, not proceeding: %w", err)
}
if checkProveCommitExpired(pci.PreCommitEpoch, msd, epoch) {
if checkProveCommitExpired(pci.PreCommitEpoch, msd, ts.Height()) {
return nil, 0, allocated, xerrors.Errorf("ticket expired for precommitted sector")
}
}
@ -171,7 +171,7 @@ func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.Se
return nil, 0, allocated, xerrors.Errorf("sector %s precommitted but expired", sector.SectorNumber)
}
rand, err := m.Api.StateGetRandomnessFromTickets(ctx.Context(), crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), tok)
rand, err := m.Api.StateGetRandomnessFromTickets(ctx.Context(), crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key())
if err != nil {
return nil, 0, allocated, err
}
@ -217,14 +217,14 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
}
}
tok, height, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
return nil
}
if checkTicketExpired(sector.TicketEpoch, height) {
pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
if checkTicketExpired(sector.TicketEpoch, ts.Height()) {
pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
if err != nil {
log.Errorf("handlePreCommit1: StateSectorPreCommitInfo: api error, not proceeding: %+v", err)
return nil
@ -234,7 +234,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorOldTicket{}) // go get new ticket
}
nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok)
nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key())
if err != nil {
log.Errorf("handlePreCommit1: StateNetworkVersion: api error, not proceeding: %+v", err)
return nil
@ -252,7 +252,7 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
}
// if height > PreCommitEpoch + msd, there is no need to recalculate
if checkProveCommitExpired(pci.PreCommitEpoch, msd, height) {
if checkProveCommitExpired(pci.PreCommitEpoch, msd, ts.Height()) {
return ctx.Send(SectorOldTicket{}) // will be removed
}
}
@ -284,13 +284,13 @@ func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo)
}
func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) (*miner.SectorPreCommitInfo, big.Int, types.TipSetKey, error) {
tok, height, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil, big.Zero(), types.EmptyTSK, nil
}
if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.Api); err != nil {
if err := checkPrecommit(ctx.Context(), m.Address(), sector, ts.Key(), ts.Height(), m.Api); err != nil {
switch err := err.(type) {
case *ErrApi:
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
@ -307,7 +307,7 @@ func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) (
case *ErrExpiredDeals:
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
case *ErrPrecommitOnChain:
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorPreCommitLanded{TipSet: ts.Key()}) // we re-did precommit
case *ErrSectorNumberAllocated:
log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err)
// TODO: check if the sector is committed (not sure how we'd end up here)
@ -322,7 +322,7 @@ func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) (
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("handlePreCommitting: failed to compute pre-commit expiry: %w", err)})
}
nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok)
nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key())
if err != nil {
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("failed to get network version: %w", err)})
}
@ -341,7 +341,7 @@ func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) (
}
// Assume: both precommit msg & commit msg land on chain as early as possible
maxExpiration := height + policy.GetPreCommitChallengeDelay() + policy.GetMaxSectorExpirationExtension()
maxExpiration := ts.Height() + policy.GetPreCommitChallengeDelay() + policy.GetMaxSectorExpirationExtension()
if expiration > maxExpiration {
expiration = maxExpiration
}
@ -356,12 +356,12 @@ func (m *Sealing) preCommitParams(ctx statemachine.Context, sector SectorInfo) (
DealIDs: sector.dealIDs(),
}
collateral, err := m.Api.StateMinerPreCommitDepositForPower(ctx.Context(), m.maddr, *params, tok)
collateral, err := m.Api.StateMinerPreCommitDepositForPower(ctx.Context(), m.maddr, *params, ts.Key())
if err != nil {
return nil, big.Zero(), types.EmptyTSK, xerrors.Errorf("getting initial pledge collateral: %w", err)
}
return params, collateral, tok, nil
return params, collateral, ts.Key(), nil
}
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
@ -482,13 +482,13 @@ func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInf
}
func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error {
tok, _, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleWaitSeed: api error, not proceeding: %+v", err)
return nil
}
pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
if err != nil {
return xerrors.Errorf("getting precommit info: %w", err)
}
@ -501,7 +501,7 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er
err = m.events.ChainAt(func(ectx context.Context, _ types.TipSetKey, curH abi.ChainEpoch) error {
// in case of null blocks the randomness can land after the tipset we
// get from the events API
tok, _, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
@ -511,9 +511,9 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er
if err := m.maddr.MarshalCBOR(buf); err != nil {
return err
}
rand, err := m.Api.StateGetRandomnessFromBeacon(ectx, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, randHeight, buf.Bytes(), tok)
rand, err := m.Api.StateGetRandomnessFromBeacon(ectx, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, randHeight, buf.Bytes(), ts.Key())
if err != nil {
err = xerrors.Errorf("failed to get randomness for computing seal proof (ch %d; rh %d; tsk %x): %w", curH, randHeight, tok, err)
err = xerrors.Errorf("failed to get randomness for computing seal proof (ch %d; rh %d; tsk %x): %w", curH, randHeight, ts.Key(), err)
_ = ctx.Send(SectorChainPreCommitFailed{error: err})
return err
@ -577,13 +577,13 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
}
{
tok, _, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}
if err := m.checkCommit(ctx.Context(), sector, proof, tok); err != nil {
if err := m.checkCommit(ctx.Context(), sector, proof, ts.Key()); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}
}
@ -616,13 +616,13 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
}
}
tok, _, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
return nil
}
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, ts.Key()); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}
@ -636,13 +636,13 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)})
}
mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, tok)
mi, err := m.Api.StateMinerInfo(ctx.Context(), m.maddr, ts.Key())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}
pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, tok)
pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
if err != nil {
return xerrors.Errorf("getting precommit info: %w", err)
}
@ -650,7 +650,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
return ctx.Send(SectorCommitFailed{error: xerrors.Errorf("precommit info not found on chain")})
}
collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, pci.Info, tok)
collateral, err := m.Api.StateMinerInitialPledgeCollateral(ctx.Context(), m.maddr, pci.Info, ts.Key())
if err != nil {
return xerrors.Errorf("getting initial pledge collateral: %w", err)
}
@ -701,13 +701,13 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
})
if err != nil || res.Error != "" {
tok, _, err := m.Api.ChainHead(ctx.Context())
ts, err := m.Api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
return nil
}
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, ts.Key()); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}

View File

@ -25,16 +25,16 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e
return xerrors.Errorf("not a committed-capacity sector, has deals")
}
tok, head, err := m.Api.ChainHead(ctx)
ts, err := m.Api.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("couldnt get chain head: %w", err)
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, id, tok)
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, id, ts.Key())
if err != nil {
return xerrors.Errorf("failed to read sector on chain info: %w", err)
}
active, err := m.sectorActive(ctx, tok, id)
active, err := m.sectorActive(ctx, ts.Key(), id)
if err != nil {
return xerrors.Errorf("failed to check if sector is active")
}
@ -42,7 +42,7 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e
return xerrors.Errorf("cannot mark inactive sector for upgrade")
}
if onChainInfo.Expiration-head < market7.DealMinDuration {
if onChainInfo.Expiration-ts.Height() < market7.DealMinDuration {
return xerrors.Errorf("pointless to upgrade sector %d, expiration %d is less than a min deal duration away from current epoch."+
"Upgrade expiration before marking for upgrade", id, onChainInfo.Expiration)
}