dealpublisher: Use address selector
This commit is contained in:
parent
8f5c0c60f5
commit
ee97aa0a8a
@ -279,8 +279,8 @@ type AddrUse int
|
||||
const (
|
||||
PreCommitAddr AddrUse = iota
|
||||
CommitAddr
|
||||
PoStAddr
|
||||
DealPublishAddr
|
||||
PoStAddr
|
||||
|
||||
TerminateSectorsAddr
|
||||
)
|
||||
|
@ -11,9 +11,13 @@ import (
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/wallet"
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
"github.com/filecoin-project/lotus/markets/storageadapter"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -28,16 +32,34 @@ func TestPublishDealsBatching(t *testing.T) {
|
||||
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
opts := node.Override(new(*storageadapter.DealPublisher),
|
||||
storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{
|
||||
Period: publishPeriod,
|
||||
MaxDealsPerMsg: maxDealsPerMsg,
|
||||
}),
|
||||
publisherKey, err := wallet.GenerateKey(types.KTSecp256k1)
|
||||
require.NoError(t, err)
|
||||
|
||||
opts := node.Options(
|
||||
node.Override(new(*storageadapter.DealPublisher),
|
||||
storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{
|
||||
Period: publishPeriod,
|
||||
MaxDealsPerMsg: maxDealsPerMsg,
|
||||
}),
|
||||
),
|
||||
node.Override(new(*storage.AddressSelector), modules.AddressSelector(&config.MinerAddressConfig{
|
||||
DealPublishControl: []string{
|
||||
publisherKey.Address.String(),
|
||||
},
|
||||
DisableOwnerFallback: true,
|
||||
DisableWorkerFallback: true,
|
||||
})),
|
||||
kit.LatestActorsAt(-1),
|
||||
)
|
||||
|
||||
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts(opts))
|
||||
client, miner, ens := kit.EnsembleMinimal(t, kit.Account(publisherKey, types.FromFil(10)), kit.MockProofs(), kit.ConstructorOpts(opts))
|
||||
ens.InterconnectAll().BeginMining(10 * time.Millisecond)
|
||||
|
||||
_, err = client.WalletImport(ctx, &publisherKey.KeyInfo)
|
||||
require.NoError(t, err)
|
||||
|
||||
kit.SetControlAddresses(t, client, miner, publisherKey.Address)
|
||||
|
||||
dh := kit.NewDealHarness(t, client, miner)
|
||||
|
||||
// Starts a deal and waits until it's published
|
||||
@ -93,6 +115,8 @@ func TestPublishDealsBatching(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Len(t, pubDealsParams.Deals, int(maxDealsPerMsg))
|
||||
}
|
||||
|
||||
require.Equal(t, publisherKey.Address.String(), msg.From.String())
|
||||
}
|
||||
require.Equal(t, 1, count)
|
||||
|
||||
|
43
itests/kit/control.go
Normal file
43
itests/kit/control.go
Normal file
@ -0,0 +1,43 @@
|
||||
package kit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
addr "github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
func SetControlAddresses(t *testing.T, client *TestFullNode, w *TestMiner, addrs ...addr.Address) {
|
||||
ctx := context.TODO()
|
||||
|
||||
mi, err := client.StateMinerInfo(ctx, w.ActorAddr, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
cwp := &miner2.ChangeWorkerAddressParams{
|
||||
NewWorker: mi.Worker,
|
||||
NewControlAddrs: addrs,
|
||||
}
|
||||
|
||||
sp, err := actors.SerializeParams(cwp)
|
||||
require.NoError(t, err)
|
||||
|
||||
smsg, err := client.MpoolPushMessage(ctx, &types.Message{
|
||||
From: mi.Owner,
|
||||
To: w.ActorAddr,
|
||||
Method: miner.Methods.ChangeWorkerAddress,
|
||||
|
||||
Value: big.Zero(),
|
||||
Params: sp,
|
||||
}, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
WaitMsg(ctx, t, client, smsg.Cid())
|
||||
}
|
@ -2,6 +2,7 @@ package kit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/ipfs/go-cid"
|
||||
"testing"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
@ -27,8 +28,12 @@ func SendFunds(ctx context.Context, t *testing.T, sender *TestFullNode, recipien
|
||||
sm, err := sender.MpoolPushMessage(ctx, msg, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := sender.StateWaitMsg(ctx, sm.Cid(), 3, api.LookbackNoLimit, true)
|
||||
WaitMsg(ctx, t, sender, sm.Cid())
|
||||
}
|
||||
|
||||
func WaitMsg(ctx context.Context, t *testing.T, node *TestFullNode, msg cid.Cid) {
|
||||
res, err := node.StateWaitMsg(ctx, msg, 3, api.LookbackNoLimit, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.EqualValues(t, 0, res.Receipt.ExitCode, "did not successfully send funds")
|
||||
require.EqualValues(t, 0, res.Receipt.ExitCode, "message did not successfully execute")
|
||||
}
|
||||
|
@ -7,27 +7,33 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
)
|
||||
|
||||
type dealPublisherAPI interface {
|
||||
ChainHead(context.Context) (*types.TipSet, error)
|
||||
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
|
||||
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error)
|
||||
|
||||
WalletBalance(context.Context, address.Address) (types.BigInt, error)
|
||||
WalletHas(context.Context, address.Address) (bool, error)
|
||||
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
||||
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
||||
}
|
||||
|
||||
// DealPublisher batches deal publishing so that many deals can be included in
|
||||
@ -40,6 +46,7 @@ type dealPublisherAPI interface {
|
||||
// publish message with all deals in the queue.
|
||||
type DealPublisher struct {
|
||||
api dealPublisherAPI
|
||||
as *storage.AddressSelector
|
||||
|
||||
ctx context.Context
|
||||
Shutdown context.CancelFunc
|
||||
@ -87,14 +94,14 @@ type PublishMsgConfig struct {
|
||||
func NewDealPublisher(
|
||||
feeConfig *config.MinerFeeConfig,
|
||||
publishMsgCfg PublishMsgConfig,
|
||||
) func(lc fx.Lifecycle, full api.FullNode) *DealPublisher {
|
||||
return func(lc fx.Lifecycle, full api.FullNode) *DealPublisher {
|
||||
) func(lc fx.Lifecycle, full api.FullNode, as *storage.AddressSelector) *DealPublisher {
|
||||
return func(lc fx.Lifecycle, full api.FullNode, as *storage.AddressSelector) *DealPublisher {
|
||||
maxFee := abi.NewTokenAmount(0)
|
||||
if feeConfig != nil {
|
||||
maxFee = abi.TokenAmount(feeConfig.MaxPublishDealsFee)
|
||||
}
|
||||
publishSpec := &api.MessageSendSpec{MaxFee: maxFee}
|
||||
dp := newDealPublisher(full, publishMsgCfg, publishSpec)
|
||||
dp := newDealPublisher(full, as, publishMsgCfg, publishSpec)
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(ctx context.Context) error {
|
||||
dp.Shutdown()
|
||||
@ -107,12 +114,14 @@ func NewDealPublisher(
|
||||
|
||||
func newDealPublisher(
|
||||
dpapi dealPublisherAPI,
|
||||
as *storage.AddressSelector,
|
||||
publishMsgCfg PublishMsgConfig,
|
||||
publishSpec *api.MessageSendSpec,
|
||||
) *DealPublisher {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &DealPublisher{
|
||||
api: dpapi,
|
||||
as: as,
|
||||
ctx: ctx,
|
||||
Shutdown: cancel,
|
||||
maxDealsPerPublishMsg: publishMsgCfg.MaxDealsPerMsg,
|
||||
@ -345,9 +354,14 @@ func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal)
|
||||
return cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err)
|
||||
}
|
||||
|
||||
addr, _, err := p.as.AddressFor(p.ctx, p.api, mi, api.DealPublishAddr, big.Zero(), big.Zero())
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("selecting address for publishing deals: %w", err)
|
||||
}
|
||||
|
||||
smsg, err := p.api.MpoolPushMessage(p.ctx, &types.Message{
|
||||
To: market.Address,
|
||||
From: mi.Worker,
|
||||
From: addr,
|
||||
Value: types.NewInt(0),
|
||||
Method: market.Methods.PublishStorageDeals,
|
||||
Params: params,
|
||||
|
@ -94,7 +94,7 @@ func TestDealPublisher(t *testing.T) {
|
||||
dpapi := newDPAPI(t)
|
||||
|
||||
// Create a deal publisher
|
||||
dp := newDealPublisher(dpapi, PublishMsgConfig{
|
||||
dp := newDealPublisher(dpapi, nil, PublishMsgConfig{
|
||||
Period: tc.publishPeriod,
|
||||
MaxDealsPerMsg: tc.maxDealsPerMsg,
|
||||
}, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)})
|
||||
@ -134,7 +134,7 @@ func TestForcePublish(t *testing.T) {
|
||||
// Create a deal publisher
|
||||
start := time.Now()
|
||||
publishPeriod := time.Hour
|
||||
dp := newDealPublisher(dpapi, PublishMsgConfig{
|
||||
dp := newDealPublisher(dpapi, nil, PublishMsgConfig{
|
||||
Period: publishPeriod,
|
||||
MaxDealsPerMsg: 10,
|
||||
}, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)})
|
||||
@ -320,6 +320,22 @@ func (d *dpAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *
|
||||
return &types.SignedMessage{Message: *msg}, nil
|
||||
}
|
||||
|
||||
func (d *dpAPI) WalletBalance(ctx context.Context, a address.Address) (types.BigInt, error) {
|
||||
panic("don't call me")
|
||||
}
|
||||
|
||||
func (d *dpAPI) WalletHas(ctx context.Context, a address.Address) (bool, error) {
|
||||
panic("don't call me")
|
||||
}
|
||||
|
||||
func (d *dpAPI) StateAccountKey(ctx context.Context, a address.Address, key types.TipSetKey) (address.Address, error) {
|
||||
panic("don't call me")
|
||||
}
|
||||
|
||||
func (d *dpAPI) StateLookupID(ctx context.Context, a address.Address, key types.TipSetKey) (address.Address, error) {
|
||||
panic("don't call me")
|
||||
}
|
||||
|
||||
func getClientActor(t *testing.T) address.Address {
|
||||
return tutils.NewActorAddr(t, "client")
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
@ -24,6 +25,12 @@ type AddressSelector struct {
|
||||
}
|
||||
|
||||
func (as *AddressSelector) AddressFor(ctx context.Context, a addrSelectApi, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
|
||||
if as == nil {
|
||||
// should only happen in some tests
|
||||
log.Warnw("smart address selection disabled, using worker address")
|
||||
return mi.Worker, big.Zero(), nil
|
||||
}
|
||||
|
||||
var addrs []address.Address
|
||||
switch use {
|
||||
case api.PreCommitAddr:
|
||||
|
Loading…
Reference in New Issue
Block a user