plumb AddressSelector to stoage fsm
This commit is contained in:
parent
783971d2c8
commit
2fd93a55ac
@ -201,6 +201,14 @@ func (st *SealSeed) Equals(ost *SealSeed) bool {
|
|||||||
|
|
||||||
type SectorState string
|
type SectorState string
|
||||||
|
|
||||||
|
type AddrUse int
|
||||||
|
|
||||||
|
const (
|
||||||
|
PreCommitAddr AddrUse = iota
|
||||||
|
CommitAddr
|
||||||
|
PoStAddr
|
||||||
|
)
|
||||||
|
|
||||||
type AddressConfig struct {
|
type AddressConfig struct {
|
||||||
PreCommitControl []address.Address
|
PreCommitControl []address.Address
|
||||||
CommitControl []address.Address
|
CommitControl []address.Address
|
||||||
|
13
extern/storage-sealing/sealing.go
vendored
13
extern/storage-sealing/sealing.go
vendored
@ -14,15 +14,16 @@ import (
|
|||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/network"
|
|
||||||
"github.com/filecoin-project/specs-storage/storage"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
padreader "github.com/filecoin-project/go-padreader"
|
padreader "github.com/filecoin-project/go-padreader"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/go-state-types/big"
|
"github.com/filecoin-project/go-state-types/big"
|
||||||
"github.com/filecoin-project/go-state-types/crypto"
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
|
"github.com/filecoin-project/go-state-types/network"
|
||||||
statemachine "github.com/filecoin-project/go-statemachine"
|
statemachine "github.com/filecoin-project/go-statemachine"
|
||||||
|
"github.com/filecoin-project/specs-storage/storage"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||||
@ -68,6 +69,8 @@ type SealingAPI interface {
|
|||||||
|
|
||||||
type SectorStateNotifee func(before, after SectorInfo)
|
type SectorStateNotifee func(before, after SectorInfo)
|
||||||
|
|
||||||
|
type AddrSel func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error)
|
||||||
|
|
||||||
type Sealing struct {
|
type Sealing struct {
|
||||||
api SealingAPI
|
api SealingAPI
|
||||||
feeCfg FeeConfig
|
feeCfg FeeConfig
|
||||||
@ -87,6 +90,7 @@ type Sealing struct {
|
|||||||
toUpgrade map[abi.SectorNumber]struct{}
|
toUpgrade map[abi.SectorNumber]struct{}
|
||||||
|
|
||||||
notifee SectorStateNotifee
|
notifee SectorStateNotifee
|
||||||
|
addrSel AddrSel
|
||||||
|
|
||||||
stats SectorStats
|
stats SectorStats
|
||||||
|
|
||||||
@ -111,7 +115,7 @@ type UnsealedSectorInfo struct {
|
|||||||
ssize abi.SectorSize
|
ssize abi.SectorSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee) *Sealing {
|
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel) *Sealing {
|
||||||
s := &Sealing{
|
s := &Sealing{
|
||||||
api: api,
|
api: api,
|
||||||
feeCfg: fc,
|
feeCfg: fc,
|
||||||
@ -130,6 +134,7 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
|
|||||||
toUpgrade: map[abi.SectorNumber]struct{}{},
|
toUpgrade: map[abi.SectorNumber]struct{}{},
|
||||||
|
|
||||||
notifee: notifee,
|
notifee: notifee,
|
||||||
|
addrSel: as,
|
||||||
|
|
||||||
getConfig: gc,
|
getConfig: gc,
|
||||||
|
|
||||||
|
22
extern/storage-sealing/states_sealing.go
vendored
22
extern/storage-sealing/states_sealing.go
vendored
@ -15,6 +15,7 @@ import (
|
|||||||
builtin0 "github.com/filecoin-project/specs-actors/actors/builtin"
|
builtin0 "github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
"github.com/filecoin-project/specs-storage/storage"
|
"github.com/filecoin-project/specs-storage/storage"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||||
@ -191,7 +192,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
waddr, err := m.api.StateMinerWorkerAddress(ctx.Context(), m.maddr, tok)
|
mi, err := m.api.StateMinerInfo(ctx.Context(), m.maddr, tok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
|
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
|
||||||
return nil
|
return nil
|
||||||
@ -266,9 +267,15 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
|
|||||||
}
|
}
|
||||||
|
|
||||||
deposit := big.Max(depositMinimum, collateral)
|
deposit := big.Max(depositMinimum, collateral)
|
||||||
|
goodFunds := big.Add(deposit, m.feeCfg.MaxPreCommitGasFee)
|
||||||
|
|
||||||
|
from, _, err := m.addrSel(ctx.Context(), mi, api.PreCommitAddr, goodFunds, deposit)
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no good address to send precommit message from", err)})
|
||||||
|
}
|
||||||
|
|
||||||
log.Infof("submitting precommit for sector %d (deposit: %s): ", sector.SectorNumber, deposit)
|
log.Infof("submitting precommit for sector %d (deposit: %s): ", sector.SectorNumber, deposit)
|
||||||
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, miner.Methods.PreCommitSector, deposit, m.feeCfg.MaxPreCommitGasFee, enc.Bytes())
|
mcid, err := m.api.SendMsg(ctx.Context(), from, m.maddr, miner.Methods.PreCommitSector, deposit, m.feeCfg.MaxPreCommitGasFee, enc.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if params.ReplaceCapacity {
|
if params.ReplaceCapacity {
|
||||||
m.remarkForUpgrade(params.ReplaceSectorNumber)
|
m.remarkForUpgrade(params.ReplaceSectorNumber)
|
||||||
@ -426,7 +433,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
|
|||||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)})
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)})
|
||||||
}
|
}
|
||||||
|
|
||||||
waddr, err := m.api.StateMinerWorkerAddress(ctx.Context(), m.maddr, tok)
|
mi, err := m.api.StateMinerInfo(ctx.Context(), m.maddr, tok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
|
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
|
||||||
return nil
|
return nil
|
||||||
@ -450,8 +457,15 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
|
|||||||
collateral = big.Zero()
|
collateral = big.Zero()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
goodFunds := big.Add(collateral, m.feeCfg.MaxCommitGasFee)
|
||||||
|
|
||||||
|
from, _, err := m.addrSel(ctx.Context(), mi, api.PreCommitAddr, goodFunds, collateral)
|
||||||
|
if err != nil {
|
||||||
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("no good address to send commit message from", err)})
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: check seed / ticket / deals are up to date
|
// TODO: check seed / ticket / deals are up to date
|
||||||
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, miner.Methods.ProveCommitSector, collateral, m.feeCfg.MaxCommitGasFee, enc.Bytes())
|
mcid, err := m.api.SendMsg(ctx.Context(), from, m.maddr, miner.Methods.ProveCommitSector, collateral, m.feeCfg.MaxCommitGasFee, enc.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
||||||
}
|
}
|
||||||
|
@ -218,7 +218,7 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sm, err := storage.NewMiner(api, maddr, h, ds, sealer, sc, verif, gsd, fc, j)
|
sm, err := storage.NewMiner(api, maddr, h, ds, sealer, sc, verif, gsd, fc, j, as)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -11,14 +11,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type AddrUse int
|
|
||||||
|
|
||||||
const (
|
|
||||||
PreCommitAddr AddrUse = iota
|
|
||||||
CommitAddr
|
|
||||||
PoStAddr
|
|
||||||
)
|
|
||||||
|
|
||||||
type addrSelectApi interface {
|
type addrSelectApi interface {
|
||||||
WalletBalance(context.Context, address.Address) (types.BigInt, error)
|
WalletBalance(context.Context, address.Address) (types.BigInt, error)
|
||||||
WalletHas(context.Context, address.Address) (bool, error)
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
@ -31,12 +23,12 @@ type AddressSelector struct {
|
|||||||
api.AddressConfig
|
api.AddressConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *AddressSelector) AddressFor(ctx context.Context, a addrSelectApi, mi miner.MinerInfo, use AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
|
func (as *AddressSelector) AddressFor(ctx context.Context, a addrSelectApi, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
|
||||||
var addrs []address.Address
|
var addrs []address.Address
|
||||||
switch use {
|
switch use {
|
||||||
case PreCommitAddr:
|
case api.PreCommitAddr:
|
||||||
addrs = append(addrs, as.PreCommitControl...)
|
addrs = append(addrs, as.PreCommitControl...)
|
||||||
case CommitAddr:
|
case api.CommitAddr:
|
||||||
addrs = append(addrs, as.CommitControl...)
|
addrs = append(addrs, as.CommitControl...)
|
||||||
default:
|
default:
|
||||||
defaultCtl := map[address.Address]struct{}{}
|
defaultCtl := map[address.Address]struct{}{}
|
||||||
|
@ -41,13 +41,14 @@ import (
|
|||||||
var log = logging.Logger("storageminer")
|
var log = logging.Logger("storageminer")
|
||||||
|
|
||||||
type Miner struct {
|
type Miner struct {
|
||||||
api storageMinerApi
|
api storageMinerApi
|
||||||
feeCfg config.MinerFeeConfig
|
feeCfg config.MinerFeeConfig
|
||||||
h host.Host
|
h host.Host
|
||||||
sealer sectorstorage.SectorManager
|
sealer sectorstorage.SectorManager
|
||||||
ds datastore.Batching
|
ds datastore.Batching
|
||||||
sc sealing.SectorIDCounter
|
sc sealing.SectorIDCounter
|
||||||
verif ffiwrapper.Verifier
|
verif ffiwrapper.Verifier
|
||||||
|
addrSel *AddressSelector
|
||||||
|
|
||||||
maddr address.Address
|
maddr address.Address
|
||||||
|
|
||||||
@ -114,15 +115,16 @@ type storageMinerApi interface {
|
|||||||
WalletHas(context.Context, address.Address) (bool, error)
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal) (*Miner, error) {
|
func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal, as *AddressSelector) (*Miner, error) {
|
||||||
m := &Miner{
|
m := &Miner{
|
||||||
api: api,
|
api: api,
|
||||||
feeCfg: feeCfg,
|
feeCfg: feeCfg,
|
||||||
h: h,
|
h: h,
|
||||||
sealer: sealer,
|
sealer: sealer,
|
||||||
ds: ds,
|
ds: ds,
|
||||||
sc: sc,
|
sc: sc,
|
||||||
verif: verif,
|
verif: verif,
|
||||||
|
addrSel: as,
|
||||||
|
|
||||||
maddr: maddr,
|
maddr: maddr,
|
||||||
getSealConfig: gsd,
|
getSealConfig: gsd,
|
||||||
@ -152,7 +154,12 @@ func (m *Miner) Run(ctx context.Context) error {
|
|||||||
adaptedAPI := NewSealingAPIAdapter(m.api)
|
adaptedAPI := NewSealingAPIAdapter(m.api)
|
||||||
// TODO: Maybe we update this policy after actor upgrades?
|
// TODO: Maybe we update this policy after actor upgrades?
|
||||||
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, policy.GetMaxSectorExpirationExtension()-(md.WPoStProvingPeriod*2), md.PeriodStart%md.WPoStProvingPeriod)
|
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, policy.GetMaxSectorExpirationExtension()-(md.WPoStProvingPeriod*2), md.PeriodStart%md.WPoStProvingPeriod)
|
||||||
m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig), m.handleSealingNotifications)
|
|
||||||
|
as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
|
||||||
|
return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig), m.handleSealingNotifications, as)
|
||||||
|
|
||||||
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function
|
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function
|
||||||
|
|
||||||
|
@ -799,7 +799,7 @@ func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message,
|
|||||||
goodFunds := big.Add(msg.RequiredFunds(), msg.Value)
|
goodFunds := big.Add(msg.RequiredFunds(), msg.Value)
|
||||||
minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds)
|
minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds)
|
||||||
|
|
||||||
pa, avail, err := s.addrSel.AddressFor(ctx, s.api, mi, PoStAddr, goodFunds, minFunds)
|
pa, avail, err := s.addrSel.AddressFor(ctx, s.api, mi, api.PoStAddr, goodFunds, minFunds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorw("error selecting address for window post", "error", err)
|
log.Errorw("error selecting address for window post", "error", err)
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
Reference in New Issue
Block a user