diff --git a/extern/storage-sealing/commit_batch.go b/extern/storage-sealing/commit_batch.go index 819cb7fc7..0d7bd899f 100644 --- a/extern/storage-sealing/commit_batch.go +++ b/extern/storage-sealing/commit_batch.go @@ -32,6 +32,8 @@ import ( const arp = abi.RegisteredAggregationProof_SnarkPackV1 +//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_commit_batcher.go -package=mocks . CommitBatcherApi + type CommitBatcherApi interface { SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) @@ -44,9 +46,9 @@ type CommitBatcherApi interface { } type AggregateInput struct { - spt abi.RegisteredSealProof - info proof5.AggregateSealVerifyInfo - proof []byte + Spt abi.RegisteredSealProof + Info proof5.AggregateSealVerifyInfo + Proof []byte } type CommitBatcher struct { @@ -256,7 +258,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa res.Sectors = append(res.Sectors, id) params.SectorNumbers.Set(uint64(id)) - infos = append(infos, p.info) + infos = append(infos, p.Info) } sort.Slice(infos, func(i, j int) bool { @@ -264,7 +266,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa }) for _, info := range infos { - proofs = append(proofs, b.todo[info.Number].proof) + proofs = append(proofs, b.todo[info.Number].Proof) } mid, err := address.IDFromAddress(b.maddr) @@ -274,7 +276,7 @@ func (b *CommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.CommitBa params.AggregateProof, err = b.prover.AggregateSealProofs(proof5.AggregateSealVerifyProofAndInfos{ Miner: abi.ActorID(mid), - SealProof: b.todo[infos[0].Number].spt, + SealProof: b.todo[infos[0].Number].Spt, AggregateProof: arp, Infos: infos, }, proofs) @@ -362,7 +364,7 @@ func (b *CommitBatcher) processSingle(mi miner.MinerInfo, sn abi.SectorNumber, i enc := new(bytes.Buffer) params := &miner.ProveCommitSectorParams{ SectorNumber: sn, - Proof: info.proof, + Proof: info.Proof, } if err := params.MarshalCBOR(enc); err != nil { @@ -447,7 +449,7 @@ func (b *CommitBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) { for _, s := range b.todo { res = append(res, abi.SectorID{ Miner: abi.ActorID(mid), - Number: s.info.Number, + Number: s.Info.Number, }) } diff --git a/extern/storage-sealing/commit_batch_test.go b/extern/storage-sealing/commit_batch_test.go new file mode 100644 index 000000000..bbf09766f --- /dev/null +++ b/extern/storage-sealing/commit_batch_test.go @@ -0,0 +1,299 @@ +package sealing_test + +import ( + "bytes" + "context" + "sort" + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/network" + miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner" + proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/extern/storage-sealing/mocks" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" +) + +func TestCommitBatcher(t *testing.T) { + t0123, err := address.NewFromString("t0123") + require.NoError(t, err) + + ctx := context.Background() + + as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { + return t0123, big.Zero(), nil + } + + maxBatch := miner5.MaxAggregatedSectors + minBatch := miner5.MinAggregatedSectors + + cfg := func() (sealiface.Config, error) { + return sealiface.Config{ + MaxWaitDealsSectors: 2, + MaxSealingSectors: 0, + MaxSealingSectorsForDeals: 0, + WaitDealsDelay: time.Hour * 6, + AlwaysKeepUnsealedCopy: true, + + BatchPreCommits: true, + MinPreCommitBatch: 1, + MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize, + PreCommitBatchWait: 24 * time.Hour, + PreCommitBatchSlack: 3 * time.Hour, + + AggregateCommits: true, + MinCommitBatch: minBatch, + MaxCommitBatch: maxBatch, + CommitBatchWait: 24 * time.Hour, + CommitBatchSlack: 1 * time.Hour, + + TerminateBatchMin: 1, + TerminateBatchMax: 100, + TerminateBatchWait: 5 * time.Minute, + }, nil + } + + type promise func(t *testing.T) + type action func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise + + actions := func(as ...action) action { + return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { + var ps []promise + for _, a := range as { + p := a(t, s, pcb) + if p != nil { + ps = append(ps, p) + } + } + + if len(ps) > 0 { + return func(t *testing.T) { + for _, p := range ps { + p(t) + } + } + } + return nil + } + } + + addSector := func(sn abi.SectorNumber) action { + return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { + var pcres sealiface.CommitBatchRes + var pcerr error + done := sync.Mutex{} + done.Lock() + + si := sealing.SectorInfo{ + SectorNumber: sn, + } + + s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil) + s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil) + s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{ + PreCommitDeposit: big.Zero(), + }, nil) + + go func() { + defer done.Unlock() + pcres, pcerr = pcb.AddCommit(ctx, si, sealing.AggregateInput{ + Info: proof5.AggregateSealVerifyInfo{ + Number: sn, + }, + }) + }() + + return func(t *testing.T) { + done.Lock() + require.NoError(t, pcerr) + require.Empty(t, pcres.Error) + require.Contains(t, pcres.Sectors, si.SectorNumber) + } + } + } + + addSectors := func(sectors []abi.SectorNumber) action { + as := make([]action, len(sectors)) + for i, sector := range sectors { + as[i] = addSector(sector) + } + return actions(as...) + } + + waitPending := func(n int) action { + return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { + require.Eventually(t, func() bool { + p, err := pcb.Pending(ctx) + require.NoError(t, err) + return len(p) == n + }, time.Second*5, 10*time.Millisecond) + + return nil + } + } + + expectSend := func(expect []abi.SectorNumber) action { + return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { + s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil) + + ti := len(expect) + batch := false + if ti >= minBatch { + batch = true + ti = 1 + } + s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil) + s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{ + PreCommitDeposit: big.Zero(), + }, nil).Times(len(expect)) + s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect)) + if batch { + s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil) + s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil) + } + + s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool { + b := i.([]byte) + if batch { + var params miner5.ProveCommitAggregateParams + require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b))) + for _, number := range expect { + set, err := params.SectorNumbers.IsSet(uint64(number)) + require.NoError(t, err) + require.True(t, set) + } + } else { + var params miner5.ProveCommitSectorParams + require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b))) + } + return true + })).Times(ti) + return nil + } + } + + flush := func(expect []abi.SectorNumber) action { + return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise { + _ = expectSend(expect)(t, s, pcb) + + batch := len(expect) >= minBatch + + r, err := pcb.Flush(ctx) + require.NoError(t, err) + if batch { + require.Len(t, r, 1) + require.Empty(t, r[0].Error) + sort.Slice(r[0].Sectors, func(i, j int) bool { + return r[0].Sectors[i] < r[0].Sectors[j] + }) + require.Equal(t, expect, r[0].Sectors) + } else { + require.Len(t, r, len(expect)) + for _, res := range r { + require.Len(t, res.Sectors, 1) + require.Empty(t, res.Error) + } + sort.Slice(r, func(i, j int) bool { + return r[i].Sectors[0] < r[j].Sectors[0] + }) + for i, res := range r { + require.Equal(t, abi.SectorNumber(i), res.Sectors[0]) + } + } + + return nil + } + } + + getSectors := func(n int) []abi.SectorNumber { + out := make([]abi.SectorNumber, n) + for i := range out { + out[i] = abi.SectorNumber(i) + } + return out + } + + tcs := map[string]struct { + actions []action + }{ + "addSingle": { + actions: []action{ + addSector(0), + waitPending(1), + flush([]abi.SectorNumber{0}), + }, + }, + "addTwo": { + actions: []action{ + addSectors(getSectors(2)), + waitPending(2), + flush(getSectors(2)), + }, + }, + "addAte": { + actions: []action{ + addSectors(getSectors(8)), + waitPending(8), + flush(getSectors(8)), + }, + }, + "addMax": { + actions: []action{ + expectSend(getSectors(maxBatch)), + addSectors(getSectors(maxBatch)), + }, + }, + } + + for name, tc := range tcs { + tc := tc + + t.Run(name, func(t *testing.T) { + // create go mock controller here + mockCtrl := gomock.NewController(t) + // when test is done, assert expectations on all mock objects. + defer mockCtrl.Finish() + + // create them mocks + pcapi := mocks.NewMockCommitBatcherApi(mockCtrl) + + pcb := sealing.NewCommitBatcher(ctx, t0123, pcapi, as, fc, cfg, &fakeProver{}) + + var promises []promise + + for _, a := range tc.actions { + p := a(t, pcapi, pcb) + if p != nil { + promises = append(promises, p) + } + } + + for _, p := range promises { + p(t) + } + + err := pcb.Stop(ctx) + require.NoError(t, err) + }) + } +} + +type fakeProver struct{} + +func (f fakeProver) AggregateSealProofs(aggregateInfo proof5.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error) { + return []byte("Trust me, I'm a proof"), nil +} + +var _ ffiwrapper.Prover = &fakeProver{} diff --git a/extern/storage-sealing/mocks/mock_commit_batcher.go b/extern/storage-sealing/mocks/mock_commit_batcher.go new file mode 100644 index 000000000..c4746e0eb --- /dev/null +++ b/extern/storage-sealing/mocks/mock_commit_batcher.go @@ -0,0 +1,149 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/filecoin-project/lotus/extern/storage-sealing (interfaces: CommitBatcherApi) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + address "github.com/filecoin-project/go-address" + abi "github.com/filecoin-project/go-state-types/abi" + big "github.com/filecoin-project/go-state-types/big" + network "github.com/filecoin-project/go-state-types/network" + miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner" + gomock "github.com/golang/mock/gomock" + cid "github.com/ipfs/go-cid" +) + +// MockCommitBatcherApi is a mock of CommitBatcherApi interface. +type MockCommitBatcherApi struct { + ctrl *gomock.Controller + recorder *MockCommitBatcherApiMockRecorder +} + +// MockCommitBatcherApiMockRecorder is the mock recorder for MockCommitBatcherApi. +type MockCommitBatcherApiMockRecorder struct { + mock *MockCommitBatcherApi +} + +// NewMockCommitBatcherApi creates a new mock instance. +func NewMockCommitBatcherApi(ctrl *gomock.Controller) *MockCommitBatcherApi { + mock := &MockCommitBatcherApi{ctrl: ctrl} + mock.recorder = &MockCommitBatcherApiMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCommitBatcherApi) EXPECT() *MockCommitBatcherApiMockRecorder { + return m.recorder +} + +// ChainBaseFee mocks base method. +func (m *MockCommitBatcherApi) ChainBaseFee(arg0 context.Context, arg1 sealing.TipSetToken) (big.Int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainBaseFee", arg0, arg1) + ret0, _ := ret[0].(big.Int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ChainBaseFee indicates an expected call of ChainBaseFee. +func (mr *MockCommitBatcherApiMockRecorder) ChainBaseFee(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainBaseFee", reflect.TypeOf((*MockCommitBatcherApi)(nil).ChainBaseFee), arg0, arg1) +} + +// ChainHead mocks base method. +func (m *MockCommitBatcherApi) ChainHead(arg0 context.Context) (sealing.TipSetToken, abi.ChainEpoch, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainHead", arg0) + ret0, _ := ret[0].(sealing.TipSetToken) + ret1, _ := ret[1].(abi.ChainEpoch) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// ChainHead indicates an expected call of ChainHead. +func (mr *MockCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockCommitBatcherApi)(nil).ChainHead), arg0) +} + +// SendMsg mocks base method. +func (m *MockCommitBatcherApi) SendMsg(arg0 context.Context, arg1, arg2 address.Address, arg3 abi.MethodNum, arg4, arg5 big.Int, arg6 []byte) (cid.Cid, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0, arg1, arg2, arg3, arg4, arg5, arg6) + ret0, _ := ret[0].(cid.Cid) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockCommitBatcherApiMockRecorder) SendMsg(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockCommitBatcherApi)(nil).SendMsg), arg0, arg1, arg2, arg3, arg4, arg5, arg6) +} + +// StateMinerInfo mocks base method. +func (m *MockCommitBatcherApi) StateMinerInfo(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (miner.MinerInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMinerInfo", arg0, arg1, arg2) + ret0, _ := ret[0].(miner.MinerInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMinerInfo indicates an expected call of StateMinerInfo. +func (mr *MockCommitBatcherApiMockRecorder) StateMinerInfo(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInfo", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateMinerInfo), arg0, arg1, arg2) +} + +// StateMinerInitialPledgeCollateral mocks base method. +func (m *MockCommitBatcherApi) StateMinerInitialPledgeCollateral(arg0 context.Context, arg1 address.Address, arg2 miner0.SectorPreCommitInfo, arg3 sealing.TipSetToken) (big.Int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMinerInitialPledgeCollateral", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(big.Int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMinerInitialPledgeCollateral indicates an expected call of StateMinerInitialPledgeCollateral. +func (mr *MockCommitBatcherApiMockRecorder) StateMinerInitialPledgeCollateral(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInitialPledgeCollateral", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateMinerInitialPledgeCollateral), arg0, arg1, arg2, arg3) +} + +// StateNetworkVersion mocks base method. +func (m *MockCommitBatcherApi) StateNetworkVersion(arg0 context.Context, arg1 sealing.TipSetToken) (network.Version, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateNetworkVersion", arg0, arg1) + ret0, _ := ret[0].(network.Version) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateNetworkVersion indicates an expected call of StateNetworkVersion. +func (mr *MockCommitBatcherApiMockRecorder) StateNetworkVersion(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateNetworkVersion", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateNetworkVersion), arg0, arg1) +} + +// StateSectorPreCommitInfo mocks base method. +func (m *MockCommitBatcherApi) StateSectorPreCommitInfo(arg0 context.Context, arg1 address.Address, arg2 abi.SectorNumber, arg3 sealing.TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateSectorPreCommitInfo", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*miner.SectorPreCommitOnChainInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateSectorPreCommitInfo indicates an expected call of StateSectorPreCommitInfo. +func (mr *MockCommitBatcherApiMockRecorder) StateSectorPreCommitInfo(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateSectorPreCommitInfo", reflect.TypeOf((*MockCommitBatcherApi)(nil).StateSectorPreCommitInfo), arg0, arg1, arg2, arg3) +} diff --git a/extern/storage-sealing/mocks/mock_precommit_batcher.go b/extern/storage-sealing/mocks/mock_precommit_batcher.go new file mode 100644 index 000000000..4a5074027 --- /dev/null +++ b/extern/storage-sealing/mocks/mock_precommit_batcher.go @@ -0,0 +1,87 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/filecoin-project/lotus/extern/storage-sealing (interfaces: PreCommitBatcherApi) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + address "github.com/filecoin-project/go-address" + abi "github.com/filecoin-project/go-state-types/abi" + big "github.com/filecoin-project/go-state-types/big" + miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + gomock "github.com/golang/mock/gomock" + cid "github.com/ipfs/go-cid" +) + +// MockPreCommitBatcherApi is a mock of PreCommitBatcherApi interface. +type MockPreCommitBatcherApi struct { + ctrl *gomock.Controller + recorder *MockPreCommitBatcherApiMockRecorder +} + +// MockPreCommitBatcherApiMockRecorder is the mock recorder for MockPreCommitBatcherApi. +type MockPreCommitBatcherApiMockRecorder struct { + mock *MockPreCommitBatcherApi +} + +// NewMockPreCommitBatcherApi creates a new mock instance. +func NewMockPreCommitBatcherApi(ctrl *gomock.Controller) *MockPreCommitBatcherApi { + mock := &MockPreCommitBatcherApi{ctrl: ctrl} + mock.recorder = &MockPreCommitBatcherApiMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPreCommitBatcherApi) EXPECT() *MockPreCommitBatcherApiMockRecorder { + return m.recorder +} + +// ChainHead mocks base method. +func (m *MockPreCommitBatcherApi) ChainHead(arg0 context.Context) (sealing.TipSetToken, abi.ChainEpoch, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainHead", arg0) + ret0, _ := ret[0].(sealing.TipSetToken) + ret1, _ := ret[1].(abi.ChainEpoch) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// ChainHead indicates an expected call of ChainHead. +func (mr *MockPreCommitBatcherApiMockRecorder) ChainHead(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainHead", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).ChainHead), arg0) +} + +// SendMsg mocks base method. +func (m *MockPreCommitBatcherApi) SendMsg(arg0 context.Context, arg1, arg2 address.Address, arg3 abi.MethodNum, arg4, arg5 big.Int, arg6 []byte) (cid.Cid, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0, arg1, arg2, arg3, arg4, arg5, arg6) + ret0, _ := ret[0].(cid.Cid) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockPreCommitBatcherApiMockRecorder) SendMsg(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).SendMsg), arg0, arg1, arg2, arg3, arg4, arg5, arg6) +} + +// StateMinerInfo mocks base method. +func (m *MockPreCommitBatcherApi) StateMinerInfo(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (miner.MinerInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateMinerInfo", arg0, arg1, arg2) + ret0, _ := ret[0].(miner.MinerInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateMinerInfo indicates an expected call of StateMinerInfo. +func (mr *MockPreCommitBatcherApiMockRecorder) StateMinerInfo(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateMinerInfo", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateMinerInfo), arg0, arg1, arg2) +} diff --git a/extern/storage-sealing/precommit_batch.go b/extern/storage-sealing/precommit_batch.go index 9439ae14c..d85a60bc6 100644 --- a/extern/storage-sealing/precommit_batch.go +++ b/extern/storage-sealing/precommit_batch.go @@ -25,6 +25,8 @@ import ( "github.com/filecoin-project/lotus/node/config" ) +//go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_precommit_batcher.go -package=mocks . PreCommitBatcherApi + type PreCommitBatcherApi interface { SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) @@ -243,7 +245,7 @@ func (b *PreCommitBatcher) processBatch(cfg sealiface.Config) ([]sealiface.PreCo res.Msg = &mcid - log.Infow("Sent ProveCommitAggregate message", "cid", mcid, "from", from, "sectors", len(b.todo)) + log.Infow("Sent PreCommitSectorBatch message", "cid", mcid, "from", from, "sectors", len(b.todo)) return []sealiface.PreCommitBatchRes{res}, nil } diff --git a/extern/storage-sealing/precommit_batch_test.go b/extern/storage-sealing/precommit_batch_test.go new file mode 100644 index 000000000..141742c14 --- /dev/null +++ b/extern/storage-sealing/precommit_batch_test.go @@ -0,0 +1,258 @@ +package sealing_test + +import ( + "bytes" + "context" + "sort" + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/extern/storage-sealing/mocks" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" + "github.com/filecoin-project/lotus/node/config" + miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner" +) + +var fc = config.MinerFeeConfig{ + MaxPreCommitGasFee: types.FIL(types.FromFil(1)), + MaxCommitGasFee: types.FIL(types.FromFil(1)), + MaxTerminateGasFee: types.FIL(types.FromFil(1)), + MaxPreCommitBatchGasFee: config.BatchFeeConfig{Base: types.FIL(types.FromFil(3)), PerSector: types.FIL(types.FromFil(1))}, + MaxCommitBatchGasFee: config.BatchFeeConfig{Base: types.FIL(types.FromFil(3)), PerSector: types.FIL(types.FromFil(1))}, +} + +func TestPrecommitBatcher(t *testing.T) { + t0123, err := address.NewFromString("t0123") + require.NoError(t, err) + + ctx := context.Background() + + as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) { + return t0123, big.Zero(), nil + } + + maxBatch := miner5.PreCommitSectorBatchMaxSize + + cfg := func() (sealiface.Config, error) { + return sealiface.Config{ + MaxWaitDealsSectors: 2, + MaxSealingSectors: 0, + MaxSealingSectorsForDeals: 0, + WaitDealsDelay: time.Hour * 6, + AlwaysKeepUnsealedCopy: true, + + BatchPreCommits: true, + MinPreCommitBatch: 1, + MaxPreCommitBatch: maxBatch, + PreCommitBatchWait: 24 * time.Hour, + PreCommitBatchSlack: 3 * time.Hour, + + AggregateCommits: true, + MinCommitBatch: miner5.MinAggregatedSectors, + MaxCommitBatch: miner5.MaxAggregatedSectors, + CommitBatchWait: 24 * time.Hour, + CommitBatchSlack: 1 * time.Hour, + + TerminateBatchMin: 1, + TerminateBatchMax: 100, + TerminateBatchWait: 5 * time.Minute, + }, nil + } + + type promise func(t *testing.T) + type action func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise + + actions := func(as ...action) action { + return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise { + var ps []promise + for _, a := range as { + p := a(t, s, pcb) + if p != nil { + ps = append(ps, p) + } + } + + if len(ps) > 0 { + return func(t *testing.T) { + for _, p := range ps { + p(t) + } + } + } + return nil + } + } + + addSector := func(sn abi.SectorNumber) action { + return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise { + var pcres sealiface.PreCommitBatchRes + var pcerr error + done := sync.Mutex{} + done.Lock() + + si := sealing.SectorInfo{ + SectorNumber: sn, + } + + s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil) + + go func() { + defer done.Unlock() + pcres, pcerr = pcb.AddPreCommit(ctx, si, big.Zero(), &miner0.SectorPreCommitInfo{ + SectorNumber: si.SectorNumber, + SealedCID: fakePieceCid(t), + DealIDs: nil, + Expiration: 0, + }) + }() + + return func(t *testing.T) { + done.Lock() + require.NoError(t, pcerr) + require.Empty(t, pcres.Error) + require.Contains(t, pcres.Sectors, si.SectorNumber) + } + } + } + + addSectors := func(sectors []abi.SectorNumber) action { + as := make([]action, len(sectors)) + for i, sector := range sectors { + as[i] = addSector(sector) + } + return actions(as...) + } + + waitPending := func(n int) action { + return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise { + require.Eventually(t, func() bool { + p, err := pcb.Pending(ctx) + require.NoError(t, err) + return len(p) == n + }, time.Second*5, 10*time.Millisecond) + + return nil + } + } + + expectSend := func(expect []abi.SectorNumber) action { + return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise { + s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil) + s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool { + b := i.([]byte) + var params miner5.PreCommitSectorBatchParams + require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b))) + for s, number := range expect { + require.Equal(t, number, params.Sectors[s].SectorNumber) + } + return true + })) + return nil + } + } + + flush := func(expect []abi.SectorNumber) action { + return func(t *testing.T, s *mocks.MockPreCommitBatcherApi, pcb *sealing.PreCommitBatcher) promise { + _ = expectSend(expect)(t, s, pcb) + + r, err := pcb.Flush(ctx) + require.NoError(t, err) + require.Len(t, r, 1) + require.Empty(t, r[0].Error) + sort.Slice(r[0].Sectors, func(i, j int) bool { + return r[0].Sectors[i] < r[0].Sectors[j] + }) + require.Equal(t, expect, r[0].Sectors) + + return nil + } + } + + getSectors := func(n int) []abi.SectorNumber { + out := make([]abi.SectorNumber, n) + for i := range out { + out[i] = abi.SectorNumber(i) + } + return out + } + + tcs := map[string]struct { + actions []action + }{ + "addSingle": { + actions: []action{ + addSector(0), + waitPending(1), + flush([]abi.SectorNumber{0}), + }, + }, + "addTwo": { + actions: []action{ + addSectors(getSectors(2)), + waitPending(2), + flush(getSectors(2)), + }, + }, + "addMax": { + actions: []action{ + expectSend(getSectors(maxBatch)), + addSectors(getSectors(maxBatch)), + }, + }, + } + + for name, tc := range tcs { + tc := tc + + t.Run(name, func(t *testing.T) { + // create go mock controller here + mockCtrl := gomock.NewController(t) + // when test is done, assert expectations on all mock objects. + defer mockCtrl.Finish() + + // create them mocks + pcapi := mocks.NewMockPreCommitBatcherApi(mockCtrl) + + pcb := sealing.NewPreCommitBatcher(ctx, t0123, pcapi, as, fc, cfg) + + var promises []promise + + for _, a := range tc.actions { + p := a(t, pcapi, pcb) + if p != nil { + promises = append(promises, p) + } + } + + for _, p := range promises { + p(t) + } + + err := pcb.Stop(ctx) + require.NoError(t, err) + }) + } +} + +type funMatcher func(interface{}) bool + +func (funMatcher) Matches(interface{}) bool { + return true +} + +func (funMatcher) String() string { + return "fun" +} diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index c75b2af94..493cbdb98 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -590,15 +590,15 @@ func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector S } res, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{ - info: proof.AggregateSealVerifyInfo{ + Info: proof.AggregateSealVerifyInfo{ Number: sector.SectorNumber, Randomness: sector.TicketValue, InteractiveRandomness: sector.SeedValue, SealedCID: *sector.CommR, UnsealedCID: *sector.CommD, }, - proof: sector.Proof, // todo: this correct?? - spt: sector.SectorType, + Proof: sector.Proof, // todo: this correct?? + Spt: sector.SectorType, }) if err != nil { return ctx.Send(SectorCommitFailed{xerrors.Errorf("queuing commit for aggregation failed: %w", err)})