sealing pipeline: Remove adapter code!

This commit is contained in:
Łukasz Magiera 2022-06-16 16:05:56 +02:00
parent 59a4fe1e56
commit d56241603a
9 changed files with 20 additions and 192 deletions

View File

@ -1,29 +0,0 @@
package storage
import (
"context"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
)
var _ sealing.Events = new(EventsAdapter)
type EventsAdapter struct {
delegate *events.Events
}
func NewEventsAdapter(api *events.Events) EventsAdapter {
return EventsAdapter{delegate: api}
}
func (e EventsAdapter) ChainAt(hnd sealing.HeightHandler, rev sealing.RevertHandler, confidence int, h abi.ChainEpoch) error {
return e.delegate.ChainAt(context.TODO(), func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
return hnd(ctx, ts.Key(), curH)
}, func(ctx context.Context, ts *types.TipSet) error {
return rev(ctx, ts.Key())
}, confidence, h)
}

View File

@ -1,123 +0,0 @@
package storage
import (
"context"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
minertypes "github.com/filecoin-project/go-state-types/builtin/v8/miner"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
pipeline "github.com/filecoin-project/lotus/storage/pipeline"
)
var _ pipeline.SealingAPI = new(SealingAPIAdapter)
type SealingAPIAdapter struct {
delegate fullNodeFilteredAPI
}
func NewSealingAPIAdapter(api fullNodeFilteredAPI) SealingAPIAdapter {
return SealingAPIAdapter{delegate: api}
}
func (s SealingAPIAdapter) StateMinerPreCommitDepositForPower(ctx context.Context, a address.Address, pci minertypes.SectorPreCommitInfo, tsk types.TipSetKey) (big.Int, error) {
return s.delegate.StateMinerPreCommitDepositForPower(ctx, a, pci, tsk)
}
func (s SealingAPIAdapter) StateMinerInitialPledgeCollateral(ctx context.Context, a address.Address, pci minertypes.SectorPreCommitInfo, tsk types.TipSetKey) (big.Int, error) {
return s.delegate.StateMinerInitialPledgeCollateral(ctx, a, pci, tsk)
}
func (s SealingAPIAdapter) StateMinerInfo(ctx context.Context, maddr address.Address, tsk types.TipSetKey) (api.MinerInfo, error) {
return s.delegate.StateMinerInfo(ctx, maddr, tsk)
}
func (s SealingAPIAdapter) StateMinerAvailableBalance(ctx context.Context, maddr address.Address, tsk types.TipSetKey) (big.Int, error) {
return s.delegate.StateMinerAvailableBalance(ctx, maddr, tsk)
}
func (s SealingAPIAdapter) StateMinerDeadlines(ctx context.Context, maddr address.Address, tsk types.TipSetKey) ([]api.Deadline, error) {
return s.delegate.StateMinerDeadlines(ctx, maddr, tsk)
}
func (s SealingAPIAdapter) StateMinerSectorAllocated(ctx context.Context, maddr address.Address, sid abi.SectorNumber, tsk types.TipSetKey) (bool, error) {
return s.delegate.StateMinerSectorAllocated(ctx, maddr, sid, tsk)
}
func (s SealingAPIAdapter) StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error) {
return s.delegate.StateSectorGetInfo(ctx, maddr, sectorNumber, tsk)
}
func (s SealingAPIAdapter) StateMinerPartitions(ctx context.Context, maddr address.Address, dlIdx uint64, tsk types.TipSetKey) ([]api.Partition, error) {
return s.delegate.StateMinerPartitions(ctx, maddr, dlIdx, tsk)
}
func (s SealingAPIAdapter) StateLookupID(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) {
return s.delegate.StateLookupID(ctx, addr, tsk)
}
func (s SealingAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tsk types.TipSetKey) (*api.MarketDeal, error) {
return s.delegate.StateMarketStorageDeal(ctx, dealID, tsk)
}
func (s SealingAPIAdapter) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) {
return s.delegate.StateNetworkVersion(ctx, tsk)
}
func (s SealingAPIAdapter) StateMinerProvingDeadline(ctx context.Context, maddr address.Address, tsk types.TipSetKey) (*dline.Info, error) {
return s.delegate.StateMinerProvingDeadline(ctx, maddr, tsk)
}
func (s SealingAPIAdapter) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) {
return s.delegate.ChainGetMessage(ctx, mc)
}
func (s SealingAPIAdapter) StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) {
return s.delegate.StateGetRandomnessFromBeacon(ctx, personalization, randEpoch, entropy, tsk)
}
func (s SealingAPIAdapter) StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) {
return s.delegate.StateGetRandomnessFromTickets(ctx, personalization, randEpoch, entropy, tsk)
}
func (s SealingAPIAdapter) ChainReadObj(ctx context.Context, ocid cid.Cid) ([]byte, error) {
return s.delegate.ChainReadObj(ctx, ocid)
}
func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) {
return s.delegate.StateWaitMsg(ctx, cid, confidence, limit, allowReplaced)
}
func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) {
return s.delegate.StateSearchMsg(ctx, from, msg, limit, allowReplaced)
}
func (s SealingAPIAdapter) ChainHead(ctx context.Context) (*types.TipSet, error) {
return s.delegate.ChainHead(ctx)
}
func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tsk types.TipSetKey) (cid.Cid, error) {
return s.delegate.StateComputeDataCID(ctx, maddr, sectorType, deals, tsk)
}
func (s SealingAPIAdapter) StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorLocation, error) {
return s.delegate.StateSectorPartition(ctx, maddr, sectorNumber, tsk)
}
func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*minertypes.SectorPreCommitOnChainInfo, error) {
return s.delegate.StateSectorPreCommitInfo(ctx, maddr, sectorNumber, tsk)
}
func (s SealingAPIAdapter) MpoolPushMessage(ctx context.Context, msg *types.Message, mss *api.MessageSendSpec) (*types.SignedMessage, error) {
return s.delegate.MpoolPushMessage(ctx, msg, mss)
}

View File

@ -177,19 +177,12 @@ func (m *Miner) Run(ctx context.Context) error {
if err != nil {
return xerrors.Errorf("failed to subscribe to events: %w", err)
}
evtsAdapter := NewEventsAdapter(evts)
// Create a shim to glue the API required by the sealing component
// with the API that Lotus is capable of providing.
// The shim translates between "tipset tokens" and tipset keys, and
// provides extra methods.
adaptedAPI := NewSealingAPIAdapter(m.api)
// Instantiate a precommit policy.
cfg := pipeline.GetSealingConfigFunc(m.getSealConfig)
provingBuffer := md.WPoStProvingPeriod * 2
pcp := pipeline.NewBasicPreCommitPolicy(adaptedAPI, cfg, provingBuffer)
pcp := pipeline.NewBasicPreCommitPolicy(m.api, cfg, provingBuffer)
// address selector.
as := func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
@ -197,7 +190,7 @@ func (m *Miner) Run(ctx context.Context) error {
}
// Instantiate the sealing FSM.
m.sealing = pipeline.New(ctx, adaptedAPI, m.feeCfg, evtsAdapter, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.handleSealingNotifications, as)
m.sealing = pipeline.New(ctx, m.api, m.feeCfg, evts, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.handleSealingNotifications, as)
// Run the sealing FSM.
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function

View File

@ -96,7 +96,7 @@ func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, t
return err
}
commD, err := api.StateComputeDataCommitment(ctx, maddr, si.SectorType, si.dealIDs(), tok)
commD, err := api.StateComputeDataCID(ctx, maddr, si.SectorType, si.dealIDs(), tok)
if err != nil {
return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)}
}
@ -220,7 +220,7 @@ func checkReplicaUpdate(ctx context.Context, maddr address.Address, si SectorInf
return xerrors.Errorf("replica update on sector not marked for update")
}
commD, err := api.StateComputeDataCommitment(ctx, maddr, si.SectorType, si.dealIDs(), tok)
commD, err := api.StateComputeDataCID(ctx, maddr, si.SectorType, si.dealIDs(), tok)
if err != nil {
return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)}
}

View File

@ -1,17 +0,0 @@
package sealing
import (
"context"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
)
// `curH`-`ts.Height` = `confidence`
type HeightHandler func(ctx context.Context, tok types.TipSetKey, curH abi.ChainEpoch) error
type RevertHandler func(ctx context.Context, tok types.TipSetKey) error
type Events interface {
ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error
}

View File

@ -107,19 +107,19 @@ func (mr *MockSealingAPIMockRecorder) MpoolPushMessage(arg0, arg1, arg2 interfac
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockSealingAPI)(nil).MpoolPushMessage), arg0, arg1, arg2)
}
// StateComputeDataCommitment mocks base method.
func (m *MockSealingAPI) StateComputeDataCommitment(arg0 context.Context, arg1 address.Address, arg2 abi.RegisteredSealProof, arg3 []abi.DealID, arg4 types.TipSetKey) (cid.Cid, error) {
// StateComputeDataCID mocks base method.
func (m *MockSealingAPI) StateComputeDataCID(arg0 context.Context, arg1 address.Address, arg2 abi.RegisteredSealProof, arg3 []abi.DealID, arg4 types.TipSetKey) (cid.Cid, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateComputeDataCommitment", arg0, arg1, arg2, arg3, arg4)
ret := m.ctrl.Call(m, "StateComputeDataCID", arg0, arg1, arg2, arg3, arg4)
ret0, _ := ret[0].(cid.Cid)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateComputeDataCommitment indicates an expected call of StateComputeDataCommitment.
func (mr *MockSealingAPIMockRecorder) StateComputeDataCommitment(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
// StateComputeDataCID indicates an expected call of StateComputeDataCID.
func (mr *MockSealingAPIMockRecorder) StateComputeDataCID(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateComputeDataCommitment", reflect.TypeOf((*MockSealingAPI)(nil).StateComputeDataCommitment), arg0, arg1, arg2, arg3, arg4)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateComputeDataCID", reflect.TypeOf((*MockSealingAPI)(nil).StateComputeDataCID), arg0, arg1, arg2, arg3, arg4)
}
// StateGetRandomnessFromBeacon mocks base method.

View File

@ -23,6 +23,7 @@ import (
"github.com/filecoin-project/lotus/api"
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
@ -41,10 +42,9 @@ var log = logging.Logger("sectors")
type SealingAPI interface {
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok types.TipSetKey) (cid.Cid, error)
// Can return ErrSectorAllocated in case precommit info wasn't found, but the sector number is marked as allocated
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error)
StateComputeDataCID(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok types.TipSetKey) (cid.Cid, error)
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*miner.SectorOnChainInfo, error)
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*lminer.SectorLocation, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
@ -70,6 +70,10 @@ type SectorStateNotifee func(before, after SectorInfo)
type AddrSel func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error)
type Events interface {
ChainAt(ctx context.Context, hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error
}
type Sealing struct {
Api SealingAPI
DealInfo *CurrentDealInfoManager

View File

@ -255,9 +255,9 @@ func (m *Sealing) handleUpdateActivating(ctx statemachine.Context, sector Sector
targetHeight := mw.Height + lb + InteractivePoRepConfidence
return m.events.ChainAt(func(context.Context, types.TipSetKey, abi.ChainEpoch) error {
return m.events.ChainAt(context.Background(), func(context.Context, *types.TipSet, abi.ChainEpoch) error {
return ctx.Send(SectorUpdateActive{})
}, func(ctx context.Context, ts types.TipSetKey) error {
}, func(ctx context.Context, ts *types.TipSet) error {
log.Warn("revert in handleUpdateActivating")
return nil
}, InteractivePoRepConfidence, targetHeight)

View File

@ -498,7 +498,7 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er
randHeight := pci.PreCommitEpoch + policy.GetPreCommitChallengeDelay()
err = m.events.ChainAt(func(ectx context.Context, _ types.TipSetKey, curH abi.ChainEpoch) error {
err = m.events.ChainAt(context.Background(), func(ectx context.Context, _ *types.TipSet, curH abi.ChainEpoch) error {
// in case of null blocks the randomness can land after the tipset we
// get from the events API
ts, err := m.Api.ChainHead(ctx.Context())
@ -522,7 +522,7 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er
_ = ctx.Send(SectorSeedReady{SeedValue: abi.InteractiveSealRandomness(rand), SeedEpoch: randHeight})
return nil
}, func(ctx context.Context, ts types.TipSetKey) error {
}, func(ctx context.Context, ts *types.TipSet) error {
log.Warn("revert in interactive commit sector step")
// TODO: need to cancel running process and restart...
return nil