sealing pipeline: Don't adapt MpoolPushMessage

This commit is contained in:
Łukasz Magiera 2022-06-16 15:50:41 +02:00
parent 9c4d10ec73
commit 59a4fe1e56
13 changed files with 83 additions and 66 deletions

View File

@ -118,19 +118,6 @@ func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr a
return s.delegate.StateSectorPreCommitInfo(ctx, maddr, sectorNumber, tsk)
}
func (s SealingAPIAdapter) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) {
msg := types.Message{
To: to,
From: from,
Value: value,
Method: method,
Params: params,
}
smsg, err := s.delegate.MpoolPushMessage(ctx, &msg, &api.MessageSendSpec{MaxFee: maxFee})
if err != nil {
return cid.Undef, err
}
return smsg.Cid(), nil
func (s SealingAPIAdapter) MpoolPushMessage(ctx context.Context, msg *types.Message, mss *api.MessageSendSpec) (*types.SignedMessage, error) {
return s.delegate.MpoolPushMessage(ctx, msg, mss)
}

View File

@ -35,7 +35,7 @@ var aggFeeDen = big.NewInt(100)
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_commit_batcher.go -package=mocks . CommitBatcherApi
type CommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
ChainHead(ctx context.Context) (*types.TipSet, error)
@ -368,7 +368,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())
mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitAggregate, needFunds, maxFee, enc.Bytes())
if err != nil {
return []sealiface.CommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
}
@ -463,7 +463,7 @@ func (b *CommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo, av
return cid.Undef, xerrors.Errorf("no good address to send commit message from: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.Int(b.feeCfg.MaxCommitGasFee), enc.Bytes())
mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.Int(b.feeCfg.MaxCommitGasFee), enc.Bytes())
if err != nil {
return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err)
}

View File

@ -203,11 +203,11 @@ func TestCommitBatcher(t *testing.T) {
//s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(basefee, nil)
}
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.([]byte)
s.EXPECT().MpoolPushMessage(gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.(*types.Message)
if batch {
var params miner5.ProveCommitAggregateParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b.Params)))
for _, number := range expect {
set, err := params.SectorNumbers.IsSet(uint64(number))
require.NoError(t, err)
@ -215,10 +215,10 @@ func TestCommitBatcher(t *testing.T) {
}
} else {
var params miner5.ProveCommitSectorParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b.Params)))
}
return true
})).Times(ti)
}), gomock.Any()).Return(dummySmsg, nil).Times(ti)
return nil
}
}
@ -393,14 +393,18 @@ func (f fakeProver) AggregateSealProofs(aggregateInfo prooftypes.AggregateSealVe
var _ ffiwrapper.Prover = &fakeProver{}
func makeBFTs(t *testing.T, basefee abi.TokenAmount, h abi.ChainEpoch) *types.TipSet {
var dummyAddr = func() address.Address {
a, _ := address.NewFromString("t00")
return a
}()
func makeBFTs(t *testing.T, basefee abi.TokenAmount, h abi.ChainEpoch) *types.TipSet {
dummyCid, _ := cid.Parse("bafkqaaa")
var ts, err = types.NewTipSet([]*types.BlockHeader{
{
Height: h,
Miner: a,
Miner: dummyAddr,
Parents: []cid.Cid{},
@ -426,3 +430,11 @@ func makeBFTs(t *testing.T, basefee abi.TokenAmount, h abi.ChainEpoch) *types.Ti
func makeTs(t *testing.T, h abi.ChainEpoch) *types.TipSet {
return makeBFTs(t, big.NewInt(0), h)
}
var dummySmsg = &types.SignedMessage{
Message: types.Message{
From: dummyAddr,
To: dummyAddr,
},
Signature: crypto.Signature{Type: crypto.SigTypeBLS},
}

View File

@ -92,19 +92,19 @@ func (mr *MockSealingAPIMockRecorder) ChainReadObj(arg0, arg1 interface{}) *gomo
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainReadObj", reflect.TypeOf((*MockSealingAPI)(nil).ChainReadObj), arg0, arg1)
}
// SendMsg mocks base method.
func (m *MockSealingAPI) SendMsg(arg0 context.Context, arg1, arg2 address.Address, arg3 abi.MethodNum, arg4, arg5 big.Int, arg6 []byte) (cid.Cid, error) {
// MpoolPushMessage mocks base method.
func (m *MockSealingAPI) MpoolPushMessage(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec) (*types.SignedMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendMsg", arg0, arg1, arg2, arg3, arg4, arg5, arg6)
ret0, _ := ret[0].(cid.Cid)
ret := m.ctrl.Call(m, "MpoolPushMessage", arg0, arg1, arg2)
ret0, _ := ret[0].(*types.SignedMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SendMsg indicates an expected call of SendMsg.
func (mr *MockSealingAPIMockRecorder) SendMsg(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call {
// MpoolPushMessage indicates an expected call of MpoolPushMessage.
func (mr *MockSealingAPIMockRecorder) MpoolPushMessage(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockSealingAPI)(nil).SendMsg), arg0, arg1, arg2, arg3, arg4, arg5, arg6)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockSealingAPI)(nil).MpoolPushMessage), arg0, arg1, arg2)
}
// StateComputeDataCommitment mocks base method.

View File

@ -9,7 +9,6 @@ import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid"
address "github.com/filecoin-project/go-address"
abi "github.com/filecoin-project/go-state-types/abi"
@ -59,19 +58,19 @@ func (mr *MockCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockCommitBatcherApi)(nil).ChainHead), arg0)
}
// SendMsg mocks base method.
func (m *MockCommitBatcherApi) SendMsg(arg0 context.Context, arg1, arg2 address.Address, arg3 abi.MethodNum, arg4, arg5 big.Int, arg6 []byte) (cid.Cid, error) {
// MpoolPushMessage mocks base method.
func (m *MockCommitBatcherApi) MpoolPushMessage(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec) (*types.SignedMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendMsg", arg0, arg1, arg2, arg3, arg4, arg5, arg6)
ret0, _ := ret[0].(cid.Cid)
ret := m.ctrl.Call(m, "MpoolPushMessage", arg0, arg1, arg2)
ret0, _ := ret[0].(*types.SignedMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SendMsg indicates an expected call of SendMsg.
func (mr *MockCommitBatcherApiMockRecorder) SendMsg(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call {
// MpoolPushMessage indicates an expected call of MpoolPushMessage.
func (mr *MockCommitBatcherApiMockRecorder) MpoolPushMessage(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockCommitBatcherApi)(nil).SendMsg), arg0, arg1, arg2, arg3, arg4, arg5, arg6)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockCommitBatcherApi)(nil).MpoolPushMessage), arg0, arg1, arg2)
}
// StateMinerAvailableBalance mocks base method.

View File

@ -9,10 +9,8 @@ import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid"
address "github.com/filecoin-project/go-address"
abi "github.com/filecoin-project/go-state-types/abi"
big "github.com/filecoin-project/go-state-types/big"
network "github.com/filecoin-project/go-state-types/network"
@ -58,19 +56,19 @@ func (mr *MockPreCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomo
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).ChainHead), arg0)
}
// SendMsg mocks base method.
func (m *MockPreCommitBatcherApi) SendMsg(arg0 context.Context, arg1, arg2 address.Address, arg3 abi.MethodNum, arg4, arg5 big.Int, arg6 []byte) (cid.Cid, error) {
// MpoolPushMessage mocks base method.
func (m *MockPreCommitBatcherApi) MpoolPushMessage(arg0 context.Context, arg1 *types.Message, arg2 *api.MessageSendSpec) (*types.SignedMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendMsg", arg0, arg1, arg2, arg3, arg4, arg5, arg6)
ret0, _ := ret[0].(cid.Cid)
ret := m.ctrl.Call(m, "MpoolPushMessage", arg0, arg1, arg2)
ret0, _ := ret[0].(*types.SignedMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SendMsg indicates an expected call of SendMsg.
func (mr *MockPreCommitBatcherApiMockRecorder) SendMsg(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call {
// MpoolPushMessage indicates an expected call of MpoolPushMessage.
func (mr *MockPreCommitBatcherApiMockRecorder) MpoolPushMessage(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).SendMsg), arg0, arg1, arg2, arg3, arg4, arg5, arg6)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MpoolPushMessage", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).MpoolPushMessage), arg0, arg1, arg2)
}
// StateMinerAvailableBalance mocks base method.

View File

@ -28,7 +28,7 @@ import (
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_precommit_batcher.go -package=mocks . PreCommitBatcherApi
type PreCommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error)
ChainHead(ctx context.Context) (*types.TipSet, error)
@ -301,7 +301,7 @@ func (b *PreCommitBatcher) processSingle(cfg sealiface.Config, mi api.MinerInfo,
return cid.Undef, xerrors.Errorf("no good address to send precommit message from: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, builtin.MethodsMiner.PreCommitSector, deposit, big.Int(b.feeCfg.MaxPreCommitGasFee), enc.Bytes())
mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.PreCommitSector, deposit, big.Int(b.feeCfg.MaxPreCommitGasFee), enc.Bytes())
if err != nil {
return cid.Undef, xerrors.Errorf("pushing message to mpool: %w", err)
}
@ -358,7 +358,7 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface.Config, tok types.TipSetKe
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("no good address found: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, builtin.MethodsMiner.PreCommitSectorBatch, needFunds, maxFee, enc.Bytes())
mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.PreCommitSectorBatch, needFunds, maxFee, enc.Bytes())
if err != nil {
return []sealiface.PreCommitBatchRes{res}, xerrors.Errorf("sending message failed: %w", err)
}

View File

@ -162,15 +162,15 @@ func TestPrecommitBatcher(t *testing.T) {
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version14, nil)
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(api.MinerInfo{Owner: t0123, Worker: t0123}, nil)
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.([]byte)
s.EXPECT().MpoolPushMessage(gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.(*types.Message)
var params miner6.PreCommitSectorBatchParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b.Params)))
for s, number := range expect {
require.Equal(t, number, params.Sectors[s].SectorNumber)
}
return true
}))
}), gomock.Any()).Return(dummySmsg, nil)
return nil
}
}
@ -184,13 +184,13 @@ func TestPrecommitBatcher(t *testing.T) {
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(api.MinerInfo{Owner: t0123, Worker: t0123}, nil)
for _, number := range expect {
numClone := number
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.([]byte)
s.EXPECT().MpoolPushMessage(gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.(*types.Message)
var params miner6.PreCommitSectorParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b.Params)))
require.Equal(t, numClone, params.SectorNumber)
return true
}))
}), gomock.Any()).Return(dummySmsg, nil)
}
return nil
}

View File

@ -58,7 +58,7 @@ type SealingAPI interface {
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
StateMinerDeadlines(context.Context, address.Address, types.TipSetKey) ([]api.Deadline, error)
StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tok types.TipSetKey) ([]api.Partition, error)
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
ChainHead(ctx context.Context) (*types.TipSet, error)
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tok types.TipSetKey) (abi.Randomness, error)

View File

@ -173,7 +173,7 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec
log.Errorf("no good address to send replica update message from: %+v", err)
return ctx.Send(SectorSubmitReplicaUpdateFailed{})
}
mcid, err := m.Api.SendMsg(ctx.Context(), from, m.maddr, builtin.MethodsMiner.ProveReplicaUpdates, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes())
mcid, err := sendMsg(ctx.Context(), m.Api, from, m.maddr, builtin.MethodsMiner.ProveReplicaUpdates, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes())
if err != nil {
log.Errorf("handleSubmitReplicaUpdate: error sending message: %+v", err)
return ctx.Send(SectorSubmitReplicaUpdateFailed{})

View File

@ -413,7 +413,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
}
log.Infof("submitting precommit for sector %d (deposit: %s): ", sector.SectorNumber, deposit)
mcid, err := m.Api.SendMsg(ctx.Context(), from, m.maddr, builtin.MethodsMiner.PreCommitSector, deposit, big.Int(m.feeCfg.MaxPreCommitGasFee), enc.Bytes())
mcid, err := sendMsg(ctx.Context(), m.Api, from, m.maddr, builtin.MethodsMiner.PreCommitSector, deposit, big.Int(m.feeCfg.MaxPreCommitGasFee), enc.Bytes())
if err != nil {
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
}
@ -673,7 +673,7 @@ func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo
}
// TODO: check seed / ticket / deals are up to date
mcid, err := m.Api.SendMsg(ctx.Context(), from, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes())
mcid, err := sendMsg(ctx.Context(), m.Api, from, m.maddr, builtin.MethodsMiner.ProveCommitSector, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes())
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
}

View File

@ -26,7 +26,7 @@ import (
type TerminateBatcherApi interface {
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*lminer.SectorLocation, error)
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tok types.TipSetKey) ([]api.Partition, error)
@ -222,7 +222,7 @@ func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
return nil, xerrors.Errorf("no good address found: %w", err)
}
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, builtin.MethodsMiner.TerminateSectors, big.Zero(), big.Int(b.feeCfg.MaxTerminateGasFee), enc.Bytes())
mcid, err := sendMsg(b.mctx, b.api, from, b.maddr, builtin.MethodsMiner.TerminateSectors, big.Zero(), big.Int(b.feeCfg.MaxTerminateGasFee), enc.Bytes())
if err != nil {
return nil, xerrors.Errorf("sending message failed: %w", err)
}

View File

@ -4,12 +4,14 @@ import (
"context"
"math/bits"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
)
@ -90,3 +92,22 @@ func collateralSendAmount(ctx context.Context, api interface {
return collateral, nil
}
func sendMsg(ctx context.Context, sa interface {
MpoolPushMessage(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error)
}, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) {
msg := types.Message{
To: to,
From: from,
Value: value,
Method: method,
Params: params,
}
smsg, err := sa.MpoolPushMessage(ctx, &msg, &api.MessageSendSpec{MaxFee: maxFee})
if err != nil {
return cid.Undef, err
}
return smsg.Cid(), nil
}