sealing pipeline: Cleanup AddresSelector plumbing

This commit is contained in:
Łukasz Magiera 2022-08-09 12:57:20 +02:00
parent 4b6a9e0387
commit fe2a589890
13 changed files with 232 additions and 37 deletions

View File

@ -15,7 +15,7 @@ import (
var log = logging.Logger("ctladdr") var log = logging.Logger("ctladdr")
type addrSelectApi interface { type NodeApi interface {
WalletBalance(context.Context, address.Address) (types.BigInt, error) WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletHas(context.Context, address.Address) (bool, error) WalletHas(context.Context, address.Address) (bool, error)
@ -27,7 +27,7 @@ type AddressSelector struct {
api.AddressConfig api.AddressConfig
} }
func (as *AddressSelector) AddressFor(ctx context.Context, a addrSelectApi, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { func (as *AddressSelector) AddressFor(ctx context.Context, a NodeApi, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
if as == nil { if as == nil {
// should only happen in some tests // should only happen in some tests
log.Warnw("smart address selection disabled, using worker address") log.Warnw("smart address selection disabled, using worker address")
@ -85,7 +85,7 @@ func (as *AddressSelector) AddressFor(ctx context.Context, a addrSelectApi, mi a
return pickAddress(ctx, a, mi, goodFunds, minFunds, addrs) return pickAddress(ctx, a, mi, goodFunds, minFunds, addrs)
} }
func pickAddress(ctx context.Context, a addrSelectApi, mi api.MinerInfo, goodFunds, minFunds abi.TokenAmount, addrs []address.Address) (address.Address, abi.TokenAmount, error) { func pickAddress(ctx context.Context, a NodeApi, mi api.MinerInfo, goodFunds, minFunds abi.TokenAmount, addrs []address.Address) (address.Address, abi.TokenAmount, error) {
leastBad := mi.Worker leastBad := mi.Worker
bestAvail := minFunds bestAvail := minFunds
@ -119,7 +119,7 @@ func pickAddress(ctx context.Context, a addrSelectApi, mi api.MinerInfo, goodFun
return leastBad, bestAvail, nil return leastBad, bestAvail, nil
} }
func maybeUseAddress(ctx context.Context, a addrSelectApi, addr address.Address, goodFunds abi.TokenAmount, leastBad *address.Address, bestAvail *abi.TokenAmount) bool { func maybeUseAddress(ctx context.Context, a NodeApi, addr address.Address, goodFunds abi.TokenAmount, leastBad *address.Address, bestAvail *abi.TokenAmount) bool {
b, err := a.WalletBalance(ctx, addr) b, err := a.WalletBalance(ctx, addr)
if err != nil { if err != nil {
log.Errorw("checking control address balance", "addr", addr, "error", err) log.Errorw("checking control address balance", "addr", addr, "error", err)

View File

@ -171,13 +171,8 @@ func (m *Miner) Run(ctx context.Context) error {
pcp := pipeline.NewBasicPreCommitPolicy(m.api, cfg, provingBuffer) pcp := pipeline.NewBasicPreCommitPolicy(m.api, cfg, provingBuffer)
// address selector.
as := func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds)
}
// Instantiate the sealing FSM. // Instantiate the sealing FSM.
m.sealing = pipeline.New(ctx, m.api, m.feeCfg, evts, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.journal, as) m.sealing = pipeline.New(ctx, m.api, m.feeCfg, evts, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.journal, m.addrSel)
// Run the sealing FSM. // Run the sealing FSM.
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function

View File

@ -43,6 +43,12 @@ type CommitBatcherApi interface {
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (big.Int, error) StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (big.Int, error)
StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error)
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error) StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error)
// Address selector
WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletHas(context.Context, address.Address) (bool, error)
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
} }
type AggregateInput struct { type AggregateInput struct {
@ -55,7 +61,7 @@ type CommitBatcher struct {
api CommitBatcherApi api CommitBatcherApi
maddr address.Address maddr address.Address
mctx context.Context mctx context.Context
addrSel AddrSel addrSel AddressSelector
feeCfg config.MinerFeeConfig feeCfg config.MinerFeeConfig
getConfig GetSealingConfigFunc getConfig GetSealingConfigFunc
prover storiface.Prover prover storiface.Prover
@ -69,7 +75,7 @@ type CommitBatcher struct {
lk sync.Mutex lk sync.Mutex
} }
func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddrSel, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc, prov storiface.Prover) *CommitBatcher { func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc, prov storiface.Prover) *CommitBatcher {
b := &CommitBatcher{ b := &CommitBatcher{
api: api, api: api,
maddr: maddr, maddr: maddr,
@ -363,7 +369,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
goodFunds := big.Add(maxFee, needFunds) goodFunds := big.Add(maxFee, needFunds)
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, needFunds) from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, needFunds)
if err != nil { if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
} }
@ -458,7 +464,7 @@ func (b *CommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo, av
goodFunds := big.Add(collateral, big.Int(b.feeCfg.MaxCommitGasFee)) goodFunds := big.Add(collateral, big.Int(b.feeCfg.MaxCommitGasFee))
from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral) from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, collateral)
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err) return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err)
} }

View File

@ -4,6 +4,7 @@ package sealing_test
import ( import (
"bytes" "bytes"
"context" "context"
"github.com/filecoin-project/lotus/storage/ctladdr"
"sort" "sort"
"sync" "sync"
"testing" "testing"
@ -37,9 +38,9 @@ func TestCommitBatcher(t *testing.T) {
ctx := context.Background() ctx := context.Background()
as := func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { as := asel(func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return t0123, big.Zero(), nil return t0123, big.Zero(), nil
} })
maxBatch := miner5.MaxAggregatedSectors maxBatch := miner5.MaxAggregatedSectors
minBatch := miner5.MinAggregatedSectors minBatch := miner5.MinAggregatedSectors
@ -438,3 +439,11 @@ var dummySmsg = &types.SignedMessage{
}, },
Signature: crypto.Signature{Type: crypto.SigTypeBLS}, Signature: crypto.Signature{Type: crypto.SigTypeBLS},
} }
type asel func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error)
func (s asel) AddressFor(ctx context.Context, _ ctladdr.NodeApi, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return s(ctx, mi, use, goodFunds, minFunds)
}
var _ pipeline.AddressSelector = asel(nil)

View File

@ -107,6 +107,21 @@ func (mr *MockSealingAPIMockRecorder) MpoolPushMessage(arg0, arg1, arg2 interfac
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockSealingAPI)(nil).MpoolPushMessage), arg0, arg1, arg2) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockSealingAPI)(nil).MpoolPushMessage), arg0, arg1, arg2)
} }
// StateAccountKey mocks base method.
func (m *MockSealingAPI) StateAccountKey(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (address.Address, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateAccountKey", arg0, arg1, arg2)
ret0, _ := ret[0].(address.Address)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateAccountKey indicates an expected call of StateAccountKey.
func (mr *MockSealingAPIMockRecorder) StateAccountKey(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateAccountKey", reflect.TypeOf((*MockSealingAPI)(nil).StateAccountKey), arg0, arg1, arg2)
}
// StateComputeDataCID mocks base method. // StateComputeDataCID mocks base method.
func (m *MockSealingAPI) StateComputeDataCID(arg0 context.Context, arg1 address.Address, arg2 abi.RegisteredSealProof, arg3 []abi.DealID, arg4 types.TipSetKey) (cid.Cid, error) { func (m *MockSealingAPI) StateComputeDataCID(arg0 context.Context, arg1 address.Address, arg2 abi.RegisteredSealProof, arg3 []abi.DealID, arg4 types.TipSetKey) (cid.Cid, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -391,3 +406,33 @@ func (mr *MockSealingAPIMockRecorder) StateWaitMsg(arg0, arg1, arg2, arg3, arg4
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateWaitMsg", reflect.TypeOf((*MockSealingAPI)(nil).StateWaitMsg), arg0, arg1, arg2, arg3, arg4) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateWaitMsg", reflect.TypeOf((*MockSealingAPI)(nil).StateWaitMsg), arg0, arg1, arg2, arg3, arg4)
} }
// WalletBalance mocks base method.
func (m *MockSealingAPI) WalletBalance(arg0 context.Context, arg1 address.Address) (big.Int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WalletBalance", arg0, arg1)
ret0, _ := ret[0].(big.Int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// WalletBalance indicates an expected call of WalletBalance.
func (mr *MockSealingAPIMockRecorder) WalletBalance(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalletBalance", reflect.TypeOf((*MockSealingAPI)(nil).WalletBalance), arg0, arg1)
}
// WalletHas mocks base method.
func (m *MockSealingAPI) WalletHas(arg0 context.Context, arg1 address.Address) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WalletHas", arg0, arg1)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// WalletHas indicates an expected call of WalletHas.
func (mr *MockSealingAPIMockRecorder) WalletHas(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalletHas", reflect.TypeOf((*MockSealingAPI)(nil).WalletHas), arg0, arg1)
}

View File

@ -73,6 +73,36 @@ func (mr *MockCommitBatcherApiMockRecorder) MpoolPushMessage(arg0, arg1, arg2 in
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockCommitBatcherApi)(nil).MpoolPushMessage), arg0, arg1, arg2) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockCommitBatcherApi)(nil).MpoolPushMessage), arg0, arg1, arg2)
} }
// StateAccountKey mocks base method.
func (m *MockCommitBatcherApi) StateAccountKey(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (address.Address, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateAccountKey", arg0, arg1, arg2)
ret0, _ := ret[0].(address.Address)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateAccountKey indicates an expected call of StateAccountKey.
func (mr *MockCommitBatcherApiMockRecorder) StateAccountKey(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateAccountKey", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateAccountKey), arg0, arg1, arg2)
}
// StateLookupID mocks base method.
func (m *MockCommitBatcherApi) StateLookupID(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (address.Address, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateLookupID", arg0, arg1, arg2)
ret0, _ := ret[0].(address.Address)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateLookupID indicates an expected call of StateLookupID.
func (mr *MockCommitBatcherApiMockRecorder) StateLookupID(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateLookupID", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateLookupID), arg0, arg1, arg2)
}
// StateMinerAvailableBalance mocks base method. // StateMinerAvailableBalance mocks base method.
func (m *MockCommitBatcherApi) StateMinerAvailableBalance(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (big.Int, error) { func (m *MockCommitBatcherApi) StateMinerAvailableBalance(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (big.Int, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -147,3 +177,33 @@ func (mr *MockCommitBatcherApiMockRecorder) StateSectorPreCommitInfo(arg0, arg1,
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSectorPreCommitInfo", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateSectorPreCommitInfo), arg0, arg1, arg2, arg3) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSectorPreCommitInfo", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateSectorPreCommitInfo), arg0, arg1, arg2, arg3)
} }
// WalletBalance mocks base method.
func (m *MockCommitBatcherApi) WalletBalance(arg0 context.Context, arg1 address.Address) (big.Int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WalletBalance", arg0, arg1)
ret0, _ := ret[0].(big.Int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// WalletBalance indicates an expected call of WalletBalance.
func (mr *MockCommitBatcherApiMockRecorder) WalletBalance(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalletBalance", reflect.TypeOf((*MockCommitBatcherApi)(nil).WalletBalance), arg0, arg1)
}
// WalletHas mocks base method.
func (m *MockCommitBatcherApi) WalletHas(arg0 context.Context, arg1 address.Address) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WalletHas", arg0, arg1)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// WalletHas indicates an expected call of WalletHas.
func (mr *MockCommitBatcherApiMockRecorder) WalletHas(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalletHas", reflect.TypeOf((*MockCommitBatcherApi)(nil).WalletHas), arg0, arg1)
}

View File

@ -71,6 +71,36 @@ func (mr *MockPreCommitBatcherApiMockRecorder) MpoolPushMessage(arg0, arg1, arg2
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).MpoolPushMessage), arg0, arg1, arg2) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).MpoolPushMessage), arg0, arg1, arg2)
} }
// StateAccountKey mocks base method.
func (m *MockPreCommitBatcherApi) StateAccountKey(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (address.Address, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateAccountKey", arg0, arg1, arg2)
ret0, _ := ret[0].(address.Address)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateAccountKey indicates an expected call of StateAccountKey.
func (mr *MockPreCommitBatcherApiMockRecorder) StateAccountKey(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateAccountKey", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateAccountKey), arg0, arg1, arg2)
}
// StateLookupID mocks base method.
func (m *MockPreCommitBatcherApi) StateLookupID(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (address.Address, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateLookupID", arg0, arg1, arg2)
ret0, _ := ret[0].(address.Address)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateLookupID indicates an expected call of StateLookupID.
func (mr *MockPreCommitBatcherApiMockRecorder) StateLookupID(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateLookupID", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateLookupID), arg0, arg1, arg2)
}
// StateMinerAvailableBalance mocks base method. // StateMinerAvailableBalance mocks base method.
func (m *MockPreCommitBatcherApi) StateMinerAvailableBalance(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (big.Int, error) { func (m *MockPreCommitBatcherApi) StateMinerAvailableBalance(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (big.Int, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -115,3 +145,33 @@ func (mr *MockPreCommitBatcherApiMockRecorder) StateNetworkVersion(arg0, arg1 in
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateNetworkVersion", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateNetworkVersion), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateNetworkVersion", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateNetworkVersion), arg0, arg1)
} }
// WalletBalance mocks base method.
func (m *MockPreCommitBatcherApi) WalletBalance(arg0 context.Context, arg1 address.Address) (big.Int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WalletBalance", arg0, arg1)
ret0, _ := ret[0].(big.Int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// WalletBalance indicates an expected call of WalletBalance.
func (mr *MockPreCommitBatcherApiMockRecorder) WalletBalance(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalletBalance", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).WalletBalance), arg0, arg1)
}
// WalletHas mocks base method.
func (m *MockPreCommitBatcherApi) WalletHas(arg0 context.Context, arg1 address.Address) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WalletHas", arg0, arg1)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// WalletHas indicates an expected call of WalletHas.
func (mr *MockPreCommitBatcherApiMockRecorder) WalletHas(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalletHas", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).WalletHas), arg0, arg1)
}

View File

@ -33,6 +33,12 @@ type PreCommitBatcherApi interface {
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error) StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error)
ChainHead(ctx context.Context) (*types.TipSet, error) ChainHead(ctx context.Context) (*types.TipSet, error)
StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error)
// Address selector
WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletHas(context.Context, address.Address) (bool, error)
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
} }
type preCommitEntry struct { type preCommitEntry struct {
@ -44,7 +50,7 @@ type PreCommitBatcher struct {
api PreCommitBatcherApi api PreCommitBatcherApi
maddr address.Address maddr address.Address
mctx context.Context mctx context.Context
addrSel AddrSel addrSel AddressSelector
feeCfg config.MinerFeeConfig feeCfg config.MinerFeeConfig
getConfig GetSealingConfigFunc getConfig GetSealingConfigFunc
@ -57,7 +63,7 @@ type PreCommitBatcher struct {
lk sync.Mutex lk sync.Mutex
} }
func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddrSel, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *PreCommitBatcher { func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *PreCommitBatcher {
b := &PreCommitBatcher{ b := &PreCommitBatcher{
api: api, api: api,
maddr: maddr, maddr: maddr,
@ -296,7 +302,7 @@ func (b *PreCommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo,
goodFunds := big.Add(deposit, big.Int(b.feeCfg.MaxPreCommitGasFee)) goodFunds := big.Add(deposit, big.Int(b.feeCfg.MaxPreCommitGasFee))
from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit) from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.PreCommitAddr, goodFunds, deposit)
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("no good address to send precommit message from: %w", err) return cid.Undef, xerrors.Errorf("no good address to send precommit message from: %w", err)
} }
@ -353,7 +359,7 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tsk types.TipSetKe
goodFunds := big.Add(maxFee, needFunds) goodFunds := big.Add(maxFee, needFunds)
from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit) from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.PreCommitAddr, goodFunds, deposit)
if err != nil { if err != nil {
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
} }

View File

@ -42,9 +42,9 @@ func TestPrecommitBatcher(t *testing.T) {
ctx := context.Background() ctx := context.Background()
as := func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { as := asel(func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return t0123, big.Zero(), nil return t0123, big.Zero(), nil
} })
maxBatch := miner6.PreCommitSectorBatchMaxSize maxBatch := miner6.PreCommitSectorBatchMaxSize

View File

@ -2,7 +2,6 @@ package sealing
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/journal"
"sync" "sync"
"time" "time"
@ -25,7 +24,9 @@ import (
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/pipeline/sealiface"
"github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
@ -64,16 +65,23 @@ type SealingAPI interface {
StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error) ChainReadObj(context.Context, cid.Cid) ([]byte, error)
// Address selector
WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletHas(context.Context, address.Address) (bool, error)
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
} }
type SectorStateNotifee func(before, after SectorInfo) type SectorStateNotifee func(before, after SectorInfo)
type AddrSel func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error)
type Events interface { type Events interface {
ChainAt(ctx context.Context, hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error ChainAt(ctx context.Context, hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error
} }
type AddressSelector interface {
AddressFor(ctx context.Context, a ctladdr.NodeApi, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error)
}
type Sealing struct { type Sealing struct {
Api SealingAPI Api SealingAPI
DealInfo *CurrentDealInfoManager DealInfo *CurrentDealInfoManager
@ -103,7 +111,7 @@ type Sealing struct {
journal journal.Journal journal journal.Journal
sealingEvtType journal.EventType sealingEvtType journal.EventType
notifee SectorStateNotifee notifee SectorStateNotifee
addrSel AddrSel addrSel AddressSelector
stats SectorStats stats SectorStats
@ -152,7 +160,7 @@ type pendingPiece struct {
accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error) accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error)
} }
func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, sc SectorIDCounter, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc GetSealingConfigFunc, journal journal.Journal, as AddrSel) *Sealing { func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, sc SectorIDCounter, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc GetSealingConfigFunc, journal journal.Journal, addrSel AddressSelector) *Sealing {
s := &Sealing{ s := &Sealing{
Api: api, Api: api,
DealInfo: &CurrentDealInfoManager{api}, DealInfo: &CurrentDealInfoManager{api},
@ -176,11 +184,11 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events
journal: journal, journal: journal,
sealingEvtType: journal.RegisterEventType("storage", "sealing_states"), sealingEvtType: journal.RegisterEventType("storage", "sealing_states"),
addrSel: as, addrSel: addrSel,
terminator: NewTerminationBatcher(mctx, maddr, api, as, fc, gc), terminator: NewTerminationBatcher(mctx, maddr, api, addrSel, fc, gc),
precommiter: NewPreCommitBatcher(mctx, maddr, api, as, fc, gc), precommiter: NewPreCommitBatcher(mctx, maddr, api, addrSel, fc, gc),
commiter: NewCommitBatcher(mctx, maddr, api, as, fc, gc, prov), commiter: NewCommitBatcher(mctx, maddr, api, addrSel, fc, gc, prov),
getConfig: gc, getConfig: gc,

View File

@ -168,7 +168,7 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec
return nil return nil
} }
from, _, err := m.addrSel(ctx.Context(), mi, api.CommitAddr, goodFunds, collateral) from, _, err := m.addrSel.AddressFor(ctx.Context(), m.Api, mi, api.CommitAddr, goodFunds, collateral)
if err != nil { if err != nil {
log.Errorf("no good address to send replica update message from: %+v", err) log.Errorf("no good address to send replica update message from: %+v", err)
return ctx.Send(SectorSubmitReplicaUpdateFailed{}) return ctx.Send(SectorSubmitReplicaUpdateFailed{})

View File

@ -407,7 +407,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
goodFunds := big.Add(deposit, big.Int(m.feeCfg.MaxPreCommitGasFee)) goodFunds := big.Add(deposit, big.Int(m.feeCfg.MaxPreCommitGasFee))
from, _, err := m.addrSel(ctx.Context(), mi, api.PreCommitAddr, goodFunds, deposit) from, _, err := m.addrSel.AddressFor(ctx.Context(), m.Api, mi, api.PreCommitAddr, goodFunds, deposit)
if err != nil { if err != nil {
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no good address to send precommit message from: %w", err)}) return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no good address to send precommit message from: %w", err)})
} }
@ -667,7 +667,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
goodFunds := big.Add(collateral, big.Int(m.feeCfg.MaxCommitGasFee)) goodFunds := big.Add(collateral, big.Int(m.feeCfg.MaxCommitGasFee))
from, _, err := m.addrSel(ctx.Context(), mi, api.CommitAddr, goodFunds, collateral) from, _, err := m.addrSel.AddressFor(ctx.Context(), m.Api, mi, api.CommitAddr, goodFunds, collateral)
if err != nil { if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("no good address to send commit message from: %w", err)}) return ctx.Send(SectorCommitFailed{xerrors.Errorf("no good address to send commit message from: %w", err)})
} }

View File

@ -30,13 +30,19 @@ type TerminateBatcherApi interface {
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tsk types.TipSetKey) ([]api.Partition, error) StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tsk types.TipSetKey) ([]api.Partition, error)
// Address selector
WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletHas(context.Context, address.Address) (bool, error)
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
} }
type TerminateBatcher struct { type TerminateBatcher struct {
api TerminateBatcherApi api TerminateBatcherApi
maddr address.Address maddr address.Address
mctx context.Context mctx context.Context
addrSel AddrSel addrSel AddressSelector
feeCfg config.MinerFeeConfig feeCfg config.MinerFeeConfig
getConfig GetSealingConfigFunc getConfig GetSealingConfigFunc
@ -49,7 +55,7 @@ type TerminateBatcher struct {
lk sync.Mutex lk sync.Mutex
} }
func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddrSel, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *TerminateBatcher { func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *TerminateBatcher {
b := &TerminateBatcher{ b := &TerminateBatcher{
api: api, api: api,
maddr: maddr, maddr: maddr,
@ -217,7 +223,7 @@ func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
return nil, xerrors.Errorf("couldn't get miner info: %w", err) return nil, xerrors.Errorf("couldn't get miner info: %w", err)
} }
from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, big.Int(b.feeCfg.MaxTerminateGasFee), big.Int(b.feeCfg.MaxTerminateGasFee)) from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.TerminateSectorsAddr, big.Int(b.feeCfg.MaxTerminateGasFee), big.Int(b.feeCfg.MaxTerminateGasFee))
if err != nil { if err != nil {
return nil, xerrors.Errorf("no good address found: %w", err) return nil, xerrors.Errorf("no good address found: %w", err)
} }