Unit tests for sector batchers

This commit is contained in:
Łukasz Magiera 2021-06-09 17:18:09 +02:00
parent 6f80d9f3c2
commit 06195bc8e1
7 changed files with 809 additions and 12 deletions

View File

@ -32,6 +32,8 @@ import (
const arp = abi.RegisteredAggregationProof_SnarkPackV1
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_commit_batcher.go -package=mocks . CommitBatcherApi
type CommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
@ -44,9 +46,9 @@ type CommitBatcherApi interface {
}
type AggregateInput struct {
spt abi.RegisteredSealProof
info proof5.AggregateSealVerifyInfo
proof []byte
Spt abi.RegisteredSealProof
Info proof5.AggregateSealVerifyInfo
Proof []byte
}
type CommitBatcher struct {
@ -256,7 +258,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
res.Sectors = append(res.Sectors, id)
params.SectorNumbers.Set(uint64(id))
infos = append(infos, p.info)
infos = append(infos, p.Info)
}
sort.Slice(infos, func(i, j int) bool {
@ -264,7 +266,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
})
for _, info := range infos {
proofs = append(proofs, b.todo[info.Number].proof)
proofs = append(proofs, b.todo[info.Number].Proof)
}
mid, err := address.IDFromAddress(b.maddr)
@ -274,7 +276,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa
params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{
Miner: abi.ActorID(mid),
SealProof: b.todo[infos[0].Number].spt,
SealProof: b.todo[infos[0].Number].Spt,
AggregateProof: arp,
Infos: infos,
}, proofs)
@ -362,7 +364,7 @@ func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, i
enc := new(bytes.Buffer)
params := &miner.ProveCommitSectorParams{
SectorNumber: sn,
Proof: info.proof,
Proof: info.Proof,
}
if err := params.MarshalCBOR(enc); err != nil {
@ -447,7 +449,7 @@ func (b *CommitBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) {
for _, s := range b.todo {
res = append(res, abi.SectorID{
Miner: abi.ActorID(mid),
Number: s.info.Number,
Number: s.Info.Number,
})
}

View File

@ -0,0 +1,299 @@
package sealing_test
import (
"bytes"
"context"
"sort"
"sync"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/network"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
)
func TestCommitBatcher(t *testing.T) {
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
ctx := context.Background()
as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return t0123, big.Zero(), nil
}
maxBatch := miner5.MaxAggregatedSectors
minBatch := miner5.MinAggregatedSectors
cfg := func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 2,
MaxSealingSectors: 0,
MaxSealingSectorsForDeals: 0,
WaitDealsDelay: time.Hour * 6,
AlwaysKeepUnsealedCopy: true,
BatchPreCommits: true,
MinPreCommitBatch: 1,
MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize,
PreCommitBatchWait: 24 * time.Hour,
PreCommitBatchSlack: 3 * time.Hour,
AggregateCommits: true,
MinCommitBatch: minBatch,
MaxCommitBatch: maxBatch,
CommitBatchWait: 24 * time.Hour,
CommitBatchSlack: 1 * time.Hour,
TerminateBatchMin: 1,
TerminateBatchMax: 100,
TerminateBatchWait: 5 * time.Minute,
}, nil
}
type promise func(t *testing.T)
type action func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise
actions := func(as ...action) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
var ps []promise
for _, a := range as {
p := a(t, s, pcb)
if p != nil {
ps = append(ps, p)
}
}
if len(ps) > 0 {
return func(t *testing.T) {
for _, p := range ps {
p(t)
}
}
}
return nil
}
}
addSector := func(sn abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
var pcres sealiface.CommitBatchRes
var pcerr error
done := sync.Mutex{}
done.Lock()
si := sealing.SectorInfo{
SectorNumber: sn,
}
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
PreCommitDeposit: big.Zero(),
}, nil)
go func() {
defer done.Unlock()
pcres, pcerr = pcb.AddCommit(ctx, si, sealing.AggregateInput{
Info: proof5.AggregateSealVerifyInfo{
Number: sn,
},
})
}()
return func(t *testing.T) {
done.Lock()
require.NoError(t, pcerr)
require.Empty(t, pcres.Error)
require.Contains(t, pcres.Sectors, si.SectorNumber)
}
}
}
addSectors := func(sectors []abi.SectorNumber) action {
as := make([]action, len(sectors))
for i, sector := range sectors {
as[i] = addSector(sector)
}
return actions(as...)
}
waitPending := func(n int) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
require.Eventually(t, func() bool {
p, err := pcb.Pending(ctx)
require.NoError(t, err)
return len(p) == n
}, time.Second*5, 10*time.Millisecond)
return nil
}
}
expectSend := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
ti := len(expect)
batch := false
if ti >= minBatch {
batch = true
ti = 1
}
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
PreCommitDeposit: big.Zero(),
}, nil).Times(len(expect))
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect))
if batch {
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil)
}
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.([]byte)
if batch {
var params miner5.ProveCommitAggregateParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
for _, number := range expect {
set, err := params.SectorNumbers.IsSet(uint64(number))
require.NoError(t, err)
require.True(t, set)
}
} else {
var params miner5.ProveCommitSectorParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
}
return true
})).Times(ti)
return nil
}
}
flush := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
_ = expectSend(expect)(t, s, pcb)
batch := len(expect) >= minBatch
r, err := pcb.Flush(ctx)
require.NoError(t, err)
if batch {
require.Len(t, r, 1)
require.Empty(t, r[0].Error)
sort.Slice(r[0].Sectors, func(i, j int) bool {
return r[0].Sectors[i] < r[0].Sectors[j]
})
require.Equal(t, expect, r[0].Sectors)
} else {
require.Len(t, r, len(expect))
for _, res := range r {
require.Len(t, res.Sectors, 1)
require.Empty(t, res.Error)
}
sort.Slice(r, func(i, j int) bool {
return r[i].Sectors[0] < r[j].Sectors[0]
})
for i, res := range r {
require.Equal(t, abi.SectorNumber(i), res.Sectors[0])
}
}
return nil
}
}
getSectors := func(n int) []abi.SectorNumber {
out := make([]abi.SectorNumber, n)
for i := range out {
out[i] = abi.SectorNumber(i)
}
return out
}
tcs := map[string]struct {
actions []action
}{
"addSingle": {
actions: []action{
addSector(0),
waitPending(1),
flush([]abi.SectorNumber{0}),
},
},
"addTwo": {
actions: []action{
addSectors(getSectors(2)),
waitPending(2),
flush(getSectors(2)),
},
},
"addAte": {
actions: []action{
addSectors(getSectors(8)),
waitPending(8),
flush(getSectors(8)),
},
},
"addMax": {
actions: []action{
expectSend(getSectors(maxBatch)),
addSectors(getSectors(maxBatch)),
},
},
}
for name, tc := range tcs {
tc := tc
t.Run(name, func(t *testing.T) {
// create go mock controller here
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()
// create them mocks
pcapi := mocks.NewMockCommitBatcherApi(mockCtrl)
pcb := sealing.NewCommitBatcher(ctx, t0123, pcapi, as, fc, cfg, &fakeProver{})
var promises []promise
for _, a := range tc.actions {
p := a(t, pcapi, pcb)
if p != nil {
promises = append(promises, p)
}
}
for _, p := range promises {
p(t)
}
err := pcb.Stop(ctx)
require.NoError(t, err)
})
}
}
type fakeProver struct{}
func (f fakeProver) AggregateSealProofs(aggregateInfo proof5.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error) {
return []byte("Trust me, I'm a proof"), nil
}
var _ ffiwrapper.Prover = &fakeProver{}

View File

@ -0,0 +1,149 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/filecoin-project/lotus/extern/storage-sealing (interfaces: CommitBatcherApi)
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
reflect "reflect"
address "github.com/filecoin-project/go-address"
abi "github.com/filecoin-project/go-state-types/abi"
big "github.com/filecoin-project/go-state-types/big"
network "github.com/filecoin-project/go-state-types/network"
miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid"
)
// MockCommitBatcherApi is a mock of CommitBatcherApi interface.
type MockCommitBatcherApi struct {
ctrl *gomock.Controller
recorder *MockCommitBatcherApiMockRecorder
}
// MockCommitBatcherApiMockRecorder is the mock recorder for MockCommitBatcherApi.
type MockCommitBatcherApiMockRecorder struct {
mock *MockCommitBatcherApi
}
// NewMockCommitBatcherApi creates a new mock instance.
func NewMockCommitBatcherApi(ctrl *gomock.Controller) *MockCommitBatcherApi {
mock := &MockCommitBatcherApi{ctrl: ctrl}
mock.recorder = &MockCommitBatcherApiMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockCommitBatcherApi) EXPECT() *MockCommitBatcherApiMockRecorder {
return m.recorder
}
// ChainBaseFee mocks base method.
func (m *MockCommitBatcherApi) ChainBaseFee(arg0 context.Context, arg1 sealing.TipSetToken) (big.Int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainBaseFee", arg0, arg1)
ret0, _ := ret[0].(big.Int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ChainBaseFee indicates an expected call of ChainBaseFee.
func (mr *MockCommitBatcherApiMockRecorder) ChainBaseFee(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainBaseFee", reflect.TypeOf((*MockCommitBatcherApi)(nil).ChainBaseFee), arg0, arg1)
}
// ChainHead mocks base method.
func (m *MockCommitBatcherApi) ChainHead(arg0 context.Context) (sealing.TipSetToken, abi.ChainEpoch, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainHead", arg0)
ret0, _ := ret[0].(sealing.TipSetToken)
ret1, _ := ret[1].(abi.ChainEpoch)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// ChainHead indicates an expected call of ChainHead.
func (mr *MockCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockCommitBatcherApi)(nil).ChainHead), arg0)
}
// SendMsg mocks base method.
func (m *MockCommitBatcherApi) SendMsg(arg0 context.Context, arg1, arg2 address.Address, arg3 abi.MethodNum, arg4, arg5 big.Int, arg6 []byte) (cid.Cid, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendMsg", arg0, arg1, arg2, arg3, arg4, arg5, arg6)
ret0, _ := ret[0].(cid.Cid)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SendMsg indicates an expected call of SendMsg.
func (mr *MockCommitBatcherApiMockRecorder) SendMsg(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockCommitBatcherApi)(nil).SendMsg), arg0, arg1, arg2, arg3, arg4, arg5, arg6)
}
// StateMinerInfo mocks base method.
func (m *MockCommitBatcherApi) StateMinerInfo(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (miner.MinerInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateMinerInfo", arg0, arg1, arg2)
ret0, _ := ret[0].(miner.MinerInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateMinerInfo indicates an expected call of StateMinerInfo.
func (mr *MockCommitBatcherApiMockRecorder) StateMinerInfo(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInfo", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateMinerInfo), arg0, arg1, arg2)
}
// StateMinerInitialPledgeCollateral mocks base method.
func (m *MockCommitBatcherApi) StateMinerInitialPledgeCollateral(arg0 context.Context, arg1 address.Address, arg2 miner0.SectorPreCommitInfo, arg3 sealing.TipSetToken) (big.Int, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateMinerInitialPledgeCollateral", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(big.Int)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateMinerInitialPledgeCollateral indicates an expected call of StateMinerInitialPledgeCollateral.
func (mr *MockCommitBatcherApiMockRecorder) StateMinerInitialPledgeCollateral(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInitialPledgeCollateral", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateMinerInitialPledgeCollateral), arg0, arg1, arg2, arg3)
}
// StateNetworkVersion mocks base method.
func (m *MockCommitBatcherApi) StateNetworkVersion(arg0 context.Context, arg1 sealing.TipSetToken) (network.Version, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateNetworkVersion", arg0, arg1)
ret0, _ := ret[0].(network.Version)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateNetworkVersion indicates an expected call of StateNetworkVersion.
func (mr *MockCommitBatcherApiMockRecorder) StateNetworkVersion(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateNetworkVersion", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateNetworkVersion), arg0, arg1)
}
// StateSectorPreCommitInfo mocks base method.
func (m *MockCommitBatcherApi) StateSectorPreCommitInfo(arg0 context.Context, arg1 address.Address, arg2 abi.SectorNumber, arg3 sealing.TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateSectorPreCommitInfo", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(*miner.SectorPreCommitOnChainInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateSectorPreCommitInfo indicates an expected call of StateSectorPreCommitInfo.
func (mr *MockCommitBatcherApiMockRecorder) StateSectorPreCommitInfo(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSectorPreCommitInfo", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateSectorPreCommitInfo), arg0, arg1, arg2, arg3)
}

View File

@ -0,0 +1,87 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/filecoin-project/lotus/extern/storage-sealing (interfaces: PreCommitBatcherApi)
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
reflect "reflect"
address "github.com/filecoin-project/go-address"
abi "github.com/filecoin-project/go-state-types/abi"
big "github.com/filecoin-project/go-state-types/big"
miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid"
)
// MockPreCommitBatcherApi is a mock of PreCommitBatcherApi interface.
type MockPreCommitBatcherApi struct {
ctrl *gomock.Controller
recorder *MockPreCommitBatcherApiMockRecorder
}
// MockPreCommitBatcherApiMockRecorder is the mock recorder for MockPreCommitBatcherApi.
type MockPreCommitBatcherApiMockRecorder struct {
mock *MockPreCommitBatcherApi
}
// NewMockPreCommitBatcherApi creates a new mock instance.
func NewMockPreCommitBatcherApi(ctrl *gomock.Controller) *MockPreCommitBatcherApi {
mock := &MockPreCommitBatcherApi{ctrl: ctrl}
mock.recorder = &MockPreCommitBatcherApiMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockPreCommitBatcherApi) EXPECT() *MockPreCommitBatcherApiMockRecorder {
return m.recorder
}
// ChainHead mocks base method.
func (m *MockPreCommitBatcherApi) ChainHead(arg0 context.Context) (sealing.TipSetToken, abi.ChainEpoch, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ChainHead", arg0)
ret0, _ := ret[0].(sealing.TipSetToken)
ret1, _ := ret[1].(abi.ChainEpoch)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// ChainHead indicates an expected call of ChainHead.
func (mr *MockPreCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).ChainHead), arg0)
}
// SendMsg mocks base method.
func (m *MockPreCommitBatcherApi) SendMsg(arg0 context.Context, arg1, arg2 address.Address, arg3 abi.MethodNum, arg4, arg5 big.Int, arg6 []byte) (cid.Cid, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendMsg", arg0, arg1, arg2, arg3, arg4, arg5, arg6)
ret0, _ := ret[0].(cid.Cid)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// SendMsg indicates an expected call of SendMsg.
func (mr *MockPreCommitBatcherApiMockRecorder) SendMsg(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).SendMsg), arg0, arg1, arg2, arg3, arg4, arg5, arg6)
}
// StateMinerInfo mocks base method.
func (m *MockPreCommitBatcherApi) StateMinerInfo(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (miner.MinerInfo, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateMinerInfo", arg0, arg1, arg2)
ret0, _ := ret[0].(miner.MinerInfo)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// StateMinerInfo indicates an expected call of StateMinerInfo.
func (mr *MockPreCommitBatcherApiMockRecorder) StateMinerInfo(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInfo", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateMinerInfo), arg0, arg1, arg2)
}

View File

@ -25,6 +25,8 @@ import (
"github.com/filecoin-project/lotus/node/config"
)
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_precommit_batcher.go -package=mocks . PreCommitBatcherApi
type PreCommitBatcherApi interface {
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
@ -243,7 +245,7 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.PreCo
res.Msg = &mcid
log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", len(b.todo))
log.Infow("Sent PreCommitSectorBatch message", "cid", mcid, "from", from, "sectors", len(b.todo))
return []sealiface.PreCommitBatchRes{res}, nil
}

View File

@ -0,0 +1,258 @@
package sealing_test
import (
"bytes"
"context"
"sort"
"sync"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/node/config"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
)
var fc = config.MinerFeeConfig{
MaxPreCommitGasFee: types.FIL(types.FromFil(1)),
MaxCommitGasFee: types.FIL(types.FromFil(1)),
MaxTerminateGasFee: types.FIL(types.FromFil(1)),
MaxPreCommitBatchGasFee: config.BatchFeeConfig{Base: types.FIL(types.FromFil(3)), PerSector: types.FIL(types.FromFil(1))},
MaxCommitBatchGasFee: config.BatchFeeConfig{Base: types.FIL(types.FromFil(3)), PerSector: types.FIL(types.FromFil(1))},
}
func TestPrecommitBatcher(t *testing.T) {
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
ctx := context.Background()
as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
return t0123, big.Zero(), nil
}
maxBatch := miner5.PreCommitSectorBatchMaxSize
cfg := func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 2,
MaxSealingSectors: 0,
MaxSealingSectorsForDeals: 0,
WaitDealsDelay: time.Hour * 6,
AlwaysKeepUnsealedCopy: true,
BatchPreCommits: true,
MinPreCommitBatch: 1,
MaxPreCommitBatch: maxBatch,
PreCommitBatchWait: 24 * time.Hour,
PreCommitBatchSlack: 3 * time.Hour,
AggregateCommits: true,
MinCommitBatch: miner5.MinAggregatedSectors,
MaxCommitBatch: miner5.MaxAggregatedSectors,
CommitBatchWait: 24 * time.Hour,
CommitBatchSlack: 1 * time.Hour,
TerminateBatchMin: 1,
TerminateBatchMax: 100,
TerminateBatchWait: 5 * time.Minute,
}, nil
}
type promise func(t *testing.T)
type action func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise
actions := func(as ...action) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
var ps []promise
for _, a := range as {
p := a(t, s, pcb)
if p != nil {
ps = append(ps, p)
}
}
if len(ps) > 0 {
return func(t *testing.T) {
for _, p := range ps {
p(t)
}
}
}
return nil
}
}
addSector := func(sn abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
var pcres sealiface.PreCommitBatchRes
var pcerr error
done := sync.Mutex{}
done.Lock()
si := sealing.SectorInfo{
SectorNumber: sn,
}
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
go func() {
defer done.Unlock()
pcres, pcerr = pcb.AddPreCommit(ctx, si, big.Zero(), &miner0.SectorPreCommitInfo{
SectorNumber: si.SectorNumber,
SealedCID: fakePieceCid(t),
DealIDs: nil,
Expiration: 0,
})
}()
return func(t *testing.T) {
done.Lock()
require.NoError(t, pcerr)
require.Empty(t, pcres.Error)
require.Contains(t, pcres.Sectors, si.SectorNumber)
}
}
}
addSectors := func(sectors []abi.SectorNumber) action {
as := make([]action, len(sectors))
for i, sector := range sectors {
as[i] = addSector(sector)
}
return actions(as...)
}
waitPending := func(n int) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
require.Eventually(t, func() bool {
p, err := pcb.Pending(ctx)
require.NoError(t, err)
return len(p) == n
}, time.Second*5, 10*time.Millisecond)
return nil
}
}
expectSend := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
b := i.([]byte)
var params miner5.PreCommitSectorBatchParams
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
for s, number := range expect {
require.Equal(t, number, params.Sectors[s].SectorNumber)
}
return true
}))
return nil
}
}
flush := func(expect []abi.SectorNumber) action {
return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise {
_ = expectSend(expect)(t, s, pcb)
r, err := pcb.Flush(ctx)
require.NoError(t, err)
require.Len(t, r, 1)
require.Empty(t, r[0].Error)
sort.Slice(r[0].Sectors, func(i, j int) bool {
return r[0].Sectors[i] < r[0].Sectors[j]
})
require.Equal(t, expect, r[0].Sectors)
return nil
}
}
getSectors := func(n int) []abi.SectorNumber {
out := make([]abi.SectorNumber, n)
for i := range out {
out[i] = abi.SectorNumber(i)
}
return out
}
tcs := map[string]struct {
actions []action
}{
"addSingle": {
actions: []action{
addSector(0),
waitPending(1),
flush([]abi.SectorNumber{0}),
},
},
"addTwo": {
actions: []action{
addSectors(getSectors(2)),
waitPending(2),
flush(getSectors(2)),
},
},
"addMax": {
actions: []action{
expectSend(getSectors(maxBatch)),
addSectors(getSectors(maxBatch)),
},
},
}
for name, tc := range tcs {
tc := tc
t.Run(name, func(t *testing.T) {
// create go mock controller here
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()
// create them mocks
pcapi := mocks.NewMockPreCommitBatcherApi(mockCtrl)
pcb := sealing.NewPreCommitBatcher(ctx, t0123, pcapi, as, fc, cfg)
var promises []promise
for _, a := range tc.actions {
p := a(t, pcapi, pcb)
if p != nil {
promises = append(promises, p)
}
}
for _, p := range promises {
p(t)
}
err := pcb.Stop(ctx)
require.NoError(t, err)
})
}
}
type funMatcher func(interface{}) bool
func (funMatcher) Matches(interface{}) bool {
return true
}
func (funMatcher) String() string {
return "fun"
}

View File

@ -590,15 +590,15 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S
}
res, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{
info: proof.AggregateSealVerifyInfo{
Info: proof.AggregateSealVerifyInfo{
Number: sector.SectorNumber,
Randomness: sector.TicketValue,
InteractiveRandomness: sector.SeedValue,
SealedCID: *sector.CommR,
UnsealedCID: *sector.CommD,
},
proof: sector.Proof, // todo: this correct??
spt: sector.SectorType,
Proof: sector.Proof, // todo: this correct??
Spt: sector.SectorType,
})
if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})