From fe2a5898907035091b8c105105b1fd0ee58b70bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 9 Aug 2022 12:57:20 +0200 Subject: [PATCH] sealing pipeline: Cleanup AddresSelector plumbing --- storage/ctladdr/addresses.go | 8 +-- storage/miner.go | 7 +-- storage/pipeline/commit_batch.go | 14 +++-- storage/pipeline/commit_batch_test.go | 13 +++- storage/pipeline/mocks/api.go | 45 ++++++++++++++ storage/pipeline/mocks/mock_commit_batcher.go | 60 +++++++++++++++++++ .../pipeline/mocks/mock_precommit_batcher.go | 60 +++++++++++++++++++ storage/pipeline/precommit_batch.go | 14 +++-- storage/pipeline/precommit_batch_test.go | 4 +- storage/pipeline/sealing.go | 26 +++++--- storage/pipeline/states_replica_update.go | 2 +- storage/pipeline/states_sealing.go | 4 +- storage/pipeline/terminate_batch.go | 12 +++- 13 files changed, 232 insertions(+), 37 deletions(-) diff --git a/storage/ctladdr/addresses.go b/storage/ctladdr/addresses.go index 701fea933..ee778cb38 100644 --- a/storage/ctladdr/addresses.go +++ b/storage/ctladdr/addresses.go @@ -15,7 +15,7 @@ import ( var log = logging.Logger("ctladdr") -type addrSelectApi interface { +type NodeApi interface { WalletBalance(context.Context, address.Address) (types.BigInt, error) WalletHas(context.Context, address.Address) (bool, error) @@ -27,7 +27,7 @@ type AddressSelector struct { api.AddressConfig } -func (as *AddressSelector) AddressFor(ctx context.Context, a addrSelectApi, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { +func (as *AddressSelector) AddressFor(ctx context.Context, a NodeApi, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { if as == nil { // should only happen in some tests log.Warnw("smart address selection disabled, using worker address") @@ -85,7 +85,7 @@ func (as *AddressSelector) AddressFor(ctx context.Context, a addrSelectApi, mi a return pickAddress(ctx, a, mi, goodFunds, minFunds, addrs) } -func pickAddress(ctx context.Context, a addrSelectApi, mi api.MinerInfo, goodFunds, minFunds abi.TokenAmount, addrs []address.Address) (address.Address, abi.TokenAmount, error) { +func pickAddress(ctx context.Context, a NodeApi, mi api.MinerInfo, goodFunds, minFunds abi.TokenAmount, addrs []address.Address) (address.Address, abi.TokenAmount, error) { leastBad := mi.Worker bestAvail := minFunds @@ -119,7 +119,7 @@ func pickAddress(ctx context.Context, a addrSelectApi, mi api.MinerInfo, goodFun return leastBad, bestAvail, nil } -func maybeUseAddress(ctx context.Context, a addrSelectApi, addr address.Address, goodFunds abi.TokenAmount, leastBad *address.Address, bestAvail *abi.TokenAmount) bool { +func maybeUseAddress(ctx context.Context, a NodeApi, addr address.Address, goodFunds abi.TokenAmount, leastBad *address.Address, bestAvail *abi.TokenAmount) bool { b, err := a.WalletBalance(ctx, addr) if err != nil { log.Errorw("checking control address balance", "addr", addr, "error", err) diff --git a/storage/miner.go b/storage/miner.go index 0c5319e94..fadd94962 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -171,13 +171,8 @@ func (m *Miner) Run(ctx context.Context) error { pcp := pipeline.NewBasicPreCommitPolicy(m.api, cfg, provingBuffer) - // address selector. - as := func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { - return m.addrSel.AddressFor(ctx, m.api, mi, use, goodFunds, minFunds) - } - // Instantiate the sealing FSM. - m.sealing = pipeline.New(ctx, m.api, m.feeCfg, evts, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.journal, as) + m.sealing = pipeline.New(ctx, m.api, m.feeCfg, evts, m.maddr, m.ds, m.sealer, m.sc, m.verif, m.prover, &pcp, cfg, m.journal, m.addrSel) // Run the sealing FSM. go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function diff --git a/storage/pipeline/commit_batch.go b/storage/pipeline/commit_batch.go index 1e1993f6a..8ab7d77c5 100644 --- a/storage/pipeline/commit_batch.go +++ b/storage/pipeline/commit_batch.go @@ -43,6 +43,12 @@ type CommitBatcherApi interface { StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (big.Int, error) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error) + + // Address selector + WalletBalance(context.Context, address.Address) (types.BigInt, error) + WalletHas(context.Context, address.Address) (bool, error) + StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) + StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) } type AggregateInput struct { @@ -55,7 +61,7 @@ type CommitBatcher struct { api CommitBatcherApi maddr address.Address mctx context.Context - addrSel AddrSel + addrSel AddressSelector feeCfg config.MinerFeeConfig getConfig GetSealingConfigFunc prover storiface.Prover @@ -69,7 +75,7 @@ type CommitBatcher struct { lk sync.Mutex } -func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddrSel, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc, prov storiface.Prover) *CommitBatcher { +func NewCommitBatcher(mctx context.Context, maddr address.Address, api CommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc, prov storiface.Prover) *CommitBatcher { b := &CommitBatcher{ api: api, maddr: maddr, @@ -363,7 +369,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa goodFunds := big.Add(maxFee, needFunds) - from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, needFunds) + from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, needFunds) if err != nil { return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) } @@ -458,7 +464,7 @@ func (b *CommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo, av goodFunds := big.Add(collateral, big.Int(b.feeCfg.MaxCommitGasFee)) - from, _, err := b.addrSel(b.mctx, mi, api.CommitAddr, goodFunds, collateral) + from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.CommitAddr, goodFunds, collateral) if err != nil { return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err) } diff --git a/storage/pipeline/commit_batch_test.go b/storage/pipeline/commit_batch_test.go index a05b9e9cd..439ee0ed1 100644 --- a/storage/pipeline/commit_batch_test.go +++ b/storage/pipeline/commit_batch_test.go @@ -4,6 +4,7 @@ package sealing_test import ( "bytes" "context" + "github.com/filecoin-project/lotus/storage/ctladdr" "sort" "sync" "testing" @@ -37,9 +38,9 @@ func TestCommitBatcher(t *testing.T) { ctx := context.Background() - as := func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { + as := asel(func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { return t0123, big.Zero(), nil - } + }) maxBatch := miner5.MaxAggregatedSectors minBatch := miner5.MinAggregatedSectors @@ -438,3 +439,11 @@ var dummySmsg = &types.SignedMessage{ }, Signature: crypto.Signature{Type: crypto.SigTypeBLS}, } + +type asel func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) + +func (s asel) AddressFor(ctx context.Context, _ ctladdr.NodeApi, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { + return s(ctx, mi, use, goodFunds, minFunds) +} + +var _ pipeline.AddressSelector = asel(nil) diff --git a/storage/pipeline/mocks/api.go b/storage/pipeline/mocks/api.go index 51f319f2a..99de10fd2 100644 --- a/storage/pipeline/mocks/api.go +++ b/storage/pipeline/mocks/api.go @@ -107,6 +107,21 @@ func (mr *MockSealingAPIMockRecorder) MpoolPushMessage(arg0, arg1, arg2 interfac return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockSealingAPI)(nil).MpoolPushMessage), arg0, arg1, arg2) } +// StateAccountKey mocks base method. +func (m *MockSealingAPI) StateAccountKey(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (address.Address, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateAccountKey", arg0, arg1, arg2) + ret0, _ := ret[0].(address.Address) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateAccountKey indicates an expected call of StateAccountKey. +func (mr *MockSealingAPIMockRecorder) StateAccountKey(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateAccountKey", reflect.TypeOf((*MockSealingAPI)(nil).StateAccountKey), arg0, arg1, arg2) +} + // StateComputeDataCID mocks base method. func (m *MockSealingAPI) StateComputeDataCID(arg0 context.Context, arg1 address.Address, arg2 abi.RegisteredSealProof, arg3 []abi.DealID, arg4 types.TipSetKey) (cid.Cid, error) { m.ctrl.T.Helper() @@ -391,3 +406,33 @@ func (mr *MockSealingAPIMockRecorder) StateWaitMsg(arg0, arg1, arg2, arg3, arg4 mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateWaitMsg", reflect.TypeOf((*MockSealingAPI)(nil).StateWaitMsg), arg0, arg1, arg2, arg3, arg4) } + +// WalletBalance mocks base method. +func (m *MockSealingAPI) WalletBalance(arg0 context.Context, arg1 address.Address) (big.Int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WalletBalance", arg0, arg1) + ret0, _ := ret[0].(big.Int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WalletBalance indicates an expected call of WalletBalance. +func (mr *MockSealingAPIMockRecorder) WalletBalance(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalletBalance", reflect.TypeOf((*MockSealingAPI)(nil).WalletBalance), arg0, arg1) +} + +// WalletHas mocks base method. +func (m *MockSealingAPI) WalletHas(arg0 context.Context, arg1 address.Address) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WalletHas", arg0, arg1) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WalletHas indicates an expected call of WalletHas. +func (mr *MockSealingAPIMockRecorder) WalletHas(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalletHas", reflect.TypeOf((*MockSealingAPI)(nil).WalletHas), arg0, arg1) +} diff --git a/storage/pipeline/mocks/mock_commit_batcher.go b/storage/pipeline/mocks/mock_commit_batcher.go index 3d7e753ad..d61fde912 100644 --- a/storage/pipeline/mocks/mock_commit_batcher.go +++ b/storage/pipeline/mocks/mock_commit_batcher.go @@ -73,6 +73,36 @@ func (mr *MockCommitBatcherApiMockRecorder) MpoolPushMessage(arg0, arg1, arg2 in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockCommitBatcherApi)(nil).MpoolPushMessage), arg0, arg1, arg2) } +// StateAccountKey mocks base method. +func (m *MockCommitBatcherApi) StateAccountKey(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (address.Address, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateAccountKey", arg0, arg1, arg2) + ret0, _ := ret[0].(address.Address) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateAccountKey indicates an expected call of StateAccountKey. +func (mr *MockCommitBatcherApiMockRecorder) StateAccountKey(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateAccountKey", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateAccountKey), arg0, arg1, arg2) +} + +// StateLookupID mocks base method. +func (m *MockCommitBatcherApi) StateLookupID(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (address.Address, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateLookupID", arg0, arg1, arg2) + ret0, _ := ret[0].(address.Address) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateLookupID indicates an expected call of StateLookupID. +func (mr *MockCommitBatcherApiMockRecorder) StateLookupID(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateLookupID", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateLookupID), arg0, arg1, arg2) +} + // StateMinerAvailableBalance mocks base method. func (m *MockCommitBatcherApi) StateMinerAvailableBalance(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (big.Int, error) { m.ctrl.T.Helper() @@ -147,3 +177,33 @@ func (mr *MockCommitBatcherApiMockRecorder) StateSectorPreCommitInfo(arg0, arg1, mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSectorPreCommitInfo", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateSectorPreCommitInfo), arg0, arg1, arg2, arg3) } + +// WalletBalance mocks base method. +func (m *MockCommitBatcherApi) WalletBalance(arg0 context.Context, arg1 address.Address) (big.Int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WalletBalance", arg0, arg1) + ret0, _ := ret[0].(big.Int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WalletBalance indicates an expected call of WalletBalance. +func (mr *MockCommitBatcherApiMockRecorder) WalletBalance(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalletBalance", reflect.TypeOf((*MockCommitBatcherApi)(nil).WalletBalance), arg0, arg1) +} + +// WalletHas mocks base method. +func (m *MockCommitBatcherApi) WalletHas(arg0 context.Context, arg1 address.Address) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WalletHas", arg0, arg1) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WalletHas indicates an expected call of WalletHas. +func (mr *MockCommitBatcherApiMockRecorder) WalletHas(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalletHas", reflect.TypeOf((*MockCommitBatcherApi)(nil).WalletHas), arg0, arg1) +} diff --git a/storage/pipeline/mocks/mock_precommit_batcher.go b/storage/pipeline/mocks/mock_precommit_batcher.go index dc9239a10..2f65e3e03 100644 --- a/storage/pipeline/mocks/mock_precommit_batcher.go +++ b/storage/pipeline/mocks/mock_precommit_batcher.go @@ -71,6 +71,36 @@ func (mr *MockPreCommitBatcherApiMockRecorder) MpoolPushMessage(arg0, arg1, arg2 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).MpoolPushMessage), arg0, arg1, arg2) } +// StateAccountKey mocks base method. +func (m *MockPreCommitBatcherApi) StateAccountKey(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (address.Address, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateAccountKey", arg0, arg1, arg2) + ret0, _ := ret[0].(address.Address) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateAccountKey indicates an expected call of StateAccountKey. +func (mr *MockPreCommitBatcherApiMockRecorder) StateAccountKey(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateAccountKey", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateAccountKey), arg0, arg1, arg2) +} + +// StateLookupID mocks base method. +func (m *MockPreCommitBatcherApi) StateLookupID(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (address.Address, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateLookupID", arg0, arg1, arg2) + ret0, _ := ret[0].(address.Address) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateLookupID indicates an expected call of StateLookupID. +func (mr *MockPreCommitBatcherApiMockRecorder) StateLookupID(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateLookupID", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateLookupID), arg0, arg1, arg2) +} + // StateMinerAvailableBalance mocks base method. func (m *MockPreCommitBatcherApi) StateMinerAvailableBalance(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (big.Int, error) { m.ctrl.T.Helper() @@ -115,3 +145,33 @@ func (mr *MockPreCommitBatcherApiMockRecorder) StateNetworkVersion(arg0, arg1 in mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateNetworkVersion", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateNetworkVersion), arg0, arg1) } + +// WalletBalance mocks base method. +func (m *MockPreCommitBatcherApi) WalletBalance(arg0 context.Context, arg1 address.Address) (big.Int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WalletBalance", arg0, arg1) + ret0, _ := ret[0].(big.Int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WalletBalance indicates an expected call of WalletBalance. +func (mr *MockPreCommitBatcherApiMockRecorder) WalletBalance(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalletBalance", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).WalletBalance), arg0, arg1) +} + +// WalletHas mocks base method. +func (m *MockPreCommitBatcherApi) WalletHas(arg0 context.Context, arg1 address.Address) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WalletHas", arg0, arg1) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// WalletHas indicates an expected call of WalletHas. +func (mr *MockPreCommitBatcherApiMockRecorder) WalletHas(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WalletHas", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).WalletHas), arg0, arg1) +} diff --git a/storage/pipeline/precommit_batch.go b/storage/pipeline/precommit_batch.go index 0a27725df..cb34e7f6c 100644 --- a/storage/pipeline/precommit_batch.go +++ b/storage/pipeline/precommit_batch.go @@ -33,6 +33,12 @@ type PreCommitBatcherApi interface { StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error) ChainHead(ctx context.Context) (*types.TipSet, error) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) + + // Address selector + WalletBalance(context.Context, address.Address) (types.BigInt, error) + WalletHas(context.Context, address.Address) (bool, error) + StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) + StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) } type preCommitEntry struct { @@ -44,7 +50,7 @@ type PreCommitBatcher struct { api PreCommitBatcherApi maddr address.Address mctx context.Context - addrSel AddrSel + addrSel AddressSelector feeCfg config.MinerFeeConfig getConfig GetSealingConfigFunc @@ -57,7 +63,7 @@ type PreCommitBatcher struct { lk sync.Mutex } -func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddrSel, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *PreCommitBatcher { +func NewPreCommitBatcher(mctx context.Context, maddr address.Address, api PreCommitBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *PreCommitBatcher { b := &PreCommitBatcher{ api: api, maddr: maddr, @@ -296,7 +302,7 @@ func (b *PreCommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo, goodFunds := big.Add(deposit, big.Int(b.feeCfg.MaxPreCommitGasFee)) - from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit) + from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.PreCommitAddr, goodFunds, deposit) if err != nil { return cid.Undef, xerrors.Errorf("no good address to send precommit message from: %w", err) } @@ -353,7 +359,7 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tsk types.TipSetKe goodFunds := big.Add(maxFee, needFunds) - from, _, err := b.addrSel(b.mctx, mi, api.PreCommitAddr, goodFunds, deposit) + from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.PreCommitAddr, goodFunds, deposit) if err != nil { return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err) } diff --git a/storage/pipeline/precommit_batch_test.go b/storage/pipeline/precommit_batch_test.go index e6b3f0f9e..d380acbeb 100644 --- a/storage/pipeline/precommit_batch_test.go +++ b/storage/pipeline/precommit_batch_test.go @@ -42,9 +42,9 @@ func TestPrecommitBatcher(t *testing.T) { ctx := context.Background() - as := func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { + as := asel(func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { return t0123, big.Zero(), nil - } + }) maxBatch := miner6.PreCommitSectorBatchMaxSize diff --git a/storage/pipeline/sealing.go b/storage/pipeline/sealing.go index 0786f61bc..072307bf5 100644 --- a/storage/pipeline/sealing.go +++ b/storage/pipeline/sealing.go @@ -2,7 +2,6 @@ package sealing import ( "context" - "github.com/filecoin-project/lotus/journal" "sync" "time" @@ -25,7 +24,9 @@ import ( lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/pipeline/sealiface" "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/storiface" @@ -64,16 +65,23 @@ type SealingAPI interface { StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) ChainReadObj(context.Context, cid.Cid) ([]byte, error) + + // Address selector + WalletBalance(context.Context, address.Address) (types.BigInt, error) + WalletHas(context.Context, address.Address) (bool, error) + StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) } type SectorStateNotifee func(before, after SectorInfo) -type AddrSel func(ctx context.Context, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) - type Events interface { ChainAt(ctx context.Context, hnd events.HeightHandler, rev events.RevertHandler, confidence int, h abi.ChainEpoch) error } +type AddressSelector interface { + AddressFor(ctx context.Context, a ctladdr.NodeApi, mi api.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) +} + type Sealing struct { Api SealingAPI DealInfo *CurrentDealInfoManager @@ -103,7 +111,7 @@ type Sealing struct { journal journal.Journal sealingEvtType journal.EventType notifee SectorStateNotifee - addrSel AddrSel + addrSel AddressSelector stats SectorStats @@ -152,7 +160,7 @@ type pendingPiece struct { accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error) } -func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, sc SectorIDCounter, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc GetSealingConfigFunc, journal journal.Journal, as AddrSel) *Sealing { +func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sealer.SectorManager, sc SectorIDCounter, verif storiface.Verifier, prov storiface.Prover, pcp PreCommitPolicy, gc GetSealingConfigFunc, journal journal.Journal, addrSel AddressSelector) *Sealing { s := &Sealing{ Api: api, DealInfo: &CurrentDealInfoManager{api}, @@ -176,11 +184,11 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events journal: journal, sealingEvtType: journal.RegisterEventType("storage", "sealing_states"), - addrSel: as, + addrSel: addrSel, - terminator: NewTerminationBatcher(mctx, maddr, api, as, fc, gc), - precommiter: NewPreCommitBatcher(mctx, maddr, api, as, fc, gc), - commiter: NewCommitBatcher(mctx, maddr, api, as, fc, gc, prov), + terminator: NewTerminationBatcher(mctx, maddr, api, addrSel, fc, gc), + precommiter: NewPreCommitBatcher(mctx, maddr, api, addrSel, fc, gc), + commiter: NewCommitBatcher(mctx, maddr, api, addrSel, fc, gc, prov), getConfig: gc, diff --git a/storage/pipeline/states_replica_update.go b/storage/pipeline/states_replica_update.go index 499048acb..3392b92e7 100644 --- a/storage/pipeline/states_replica_update.go +++ b/storage/pipeline/states_replica_update.go @@ -168,7 +168,7 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec return nil } - from, _, err := m.addrSel(ctx.Context(), mi, api.CommitAddr, goodFunds, collateral) + from, _, err := m.addrSel.AddressFor(ctx.Context(), m.Api, mi, api.CommitAddr, goodFunds, collateral) if err != nil { log.Errorf("no good address to send replica update message from: %+v", err) return ctx.Send(SectorSubmitReplicaUpdateFailed{}) diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index cc248a078..5ec8f077f 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -407,7 +407,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf goodFunds := big.Add(deposit, big.Int(m.feeCfg.MaxPreCommitGasFee)) - from, _, err := m.addrSel(ctx.Context(), mi, api.PreCommitAddr, goodFunds, deposit) + from, _, err := m.addrSel.AddressFor(ctx.Context(), m.Api, mi, api.PreCommitAddr, goodFunds, deposit) if err != nil { return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no good address to send precommit message from: %w", err)}) } @@ -667,7 +667,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo goodFunds := big.Add(collateral, big.Int(m.feeCfg.MaxCommitGasFee)) - from, _, err := m.addrSel(ctx.Context(), mi, api.CommitAddr, goodFunds, collateral) + from, _, err := m.addrSel.AddressFor(ctx.Context(), m.Api, mi, api.CommitAddr, goodFunds, collateral) if err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("no good address to send commit message from: %w", err)}) } diff --git a/storage/pipeline/terminate_batch.go b/storage/pipeline/terminate_batch.go index 0f65ca31a..9212cbc65 100644 --- a/storage/pipeline/terminate_batch.go +++ b/storage/pipeline/terminate_batch.go @@ -30,13 +30,19 @@ type TerminateBatcherApi interface { StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tsk types.TipSetKey) ([]api.Partition, error) + + // Address selector + WalletBalance(context.Context, address.Address) (types.BigInt, error) + WalletHas(context.Context, address.Address) (bool, error) + StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) + StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) } type TerminateBatcher struct { api TerminateBatcherApi maddr address.Address mctx context.Context - addrSel AddrSel + addrSel AddressSelector feeCfg config.MinerFeeConfig getConfig GetSealingConfigFunc @@ -49,7 +55,7 @@ type TerminateBatcher struct { lk sync.Mutex } -func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddrSel, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *TerminateBatcher { +func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddressSelector, feeCfg config.MinerFeeConfig, getConfig GetSealingConfigFunc) *TerminateBatcher { b := &TerminateBatcher{ api: api, maddr: maddr, @@ -217,7 +223,7 @@ func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) { return nil, xerrors.Errorf("couldn't get miner info: %w", err) } - from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, big.Int(b.feeCfg.MaxTerminateGasFee), big.Int(b.feeCfg.MaxTerminateGasFee)) + from, _, err := b.addrSel.AddressFor(b.mctx, b.api, mi, api.TerminateSectorsAddr, big.Int(b.feeCfg.MaxTerminateGasFee), big.Int(b.feeCfg.MaxTerminateGasFee)) if err != nil { return nil, xerrors.Errorf("no good address found: %w", err) }