300 lines
8.0 KiB
Go
300 lines
8.0 KiB
Go
|
package sealing_test
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"sort"
|
||
|
"sync"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"github.com/golang/mock/gomock"
|
||
|
"github.com/stretchr/testify/require"
|
||
|
|
||
|
"github.com/filecoin-project/go-address"
|
||
|
"github.com/filecoin-project/go-state-types/abi"
|
||
|
"github.com/filecoin-project/go-state-types/big"
|
||
|
"github.com/filecoin-project/go-state-types/network"
|
||
|
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
|
||
|
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
|
||
|
|
||
|
"github.com/filecoin-project/lotus/api"
|
||
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||
|
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/mocks"
|
||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||
|
)
|
||
|
|
||
|
func TestCommitBatcher(t *testing.T) {
|
||
|
t0123, err := address.NewFromString("t0123")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
ctx := context.Background()
|
||
|
|
||
|
as := func(ctx context.Context, mi miner.MinerInfo, use api.AddrUse, goodFunds, minFunds abi.TokenAmount) (address.Address, abi.TokenAmount, error) {
|
||
|
return t0123, big.Zero(), nil
|
||
|
}
|
||
|
|
||
|
maxBatch := miner5.MaxAggregatedSectors
|
||
|
minBatch := miner5.MinAggregatedSectors
|
||
|
|
||
|
cfg := func() (sealiface.Config, error) {
|
||
|
return sealiface.Config{
|
||
|
MaxWaitDealsSectors: 2,
|
||
|
MaxSealingSectors: 0,
|
||
|
MaxSealingSectorsForDeals: 0,
|
||
|
WaitDealsDelay: time.Hour * 6,
|
||
|
AlwaysKeepUnsealedCopy: true,
|
||
|
|
||
|
BatchPreCommits: true,
|
||
|
MinPreCommitBatch: 1,
|
||
|
MaxPreCommitBatch: miner5.PreCommitSectorBatchMaxSize,
|
||
|
PreCommitBatchWait: 24 * time.Hour,
|
||
|
PreCommitBatchSlack: 3 * time.Hour,
|
||
|
|
||
|
AggregateCommits: true,
|
||
|
MinCommitBatch: minBatch,
|
||
|
MaxCommitBatch: maxBatch,
|
||
|
CommitBatchWait: 24 * time.Hour,
|
||
|
CommitBatchSlack: 1 * time.Hour,
|
||
|
|
||
|
TerminateBatchMin: 1,
|
||
|
TerminateBatchMax: 100,
|
||
|
TerminateBatchWait: 5 * time.Minute,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
type promise func(t *testing.T)
|
||
|
type action func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise
|
||
|
|
||
|
actions := func(as ...action) action {
|
||
|
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||
|
var ps []promise
|
||
|
for _, a := range as {
|
||
|
p := a(t, s, pcb)
|
||
|
if p != nil {
|
||
|
ps = append(ps, p)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if len(ps) > 0 {
|
||
|
return func(t *testing.T) {
|
||
|
for _, p := range ps {
|
||
|
p(t)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
addSector := func(sn abi.SectorNumber) action {
|
||
|
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||
|
var pcres sealiface.CommitBatchRes
|
||
|
var pcerr error
|
||
|
done := sync.Mutex{}
|
||
|
done.Lock()
|
||
|
|
||
|
si := sealing.SectorInfo{
|
||
|
SectorNumber: sn,
|
||
|
}
|
||
|
|
||
|
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
|
||
|
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
|
||
|
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
|
||
|
PreCommitDeposit: big.Zero(),
|
||
|
}, nil)
|
||
|
|
||
|
go func() {
|
||
|
defer done.Unlock()
|
||
|
pcres, pcerr = pcb.AddCommit(ctx, si, sealing.AggregateInput{
|
||
|
Info: proof5.AggregateSealVerifyInfo{
|
||
|
Number: sn,
|
||
|
},
|
||
|
})
|
||
|
}()
|
||
|
|
||
|
return func(t *testing.T) {
|
||
|
done.Lock()
|
||
|
require.NoError(t, pcerr)
|
||
|
require.Empty(t, pcres.Error)
|
||
|
require.Contains(t, pcres.Sectors, si.SectorNumber)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
addSectors := func(sectors []abi.SectorNumber) action {
|
||
|
as := make([]action, len(sectors))
|
||
|
for i, sector := range sectors {
|
||
|
as[i] = addSector(sector)
|
||
|
}
|
||
|
return actions(as...)
|
||
|
}
|
||
|
|
||
|
waitPending := func(n int) action {
|
||
|
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||
|
require.Eventually(t, func() bool {
|
||
|
p, err := pcb.Pending(ctx)
|
||
|
require.NoError(t, err)
|
||
|
return len(p) == n
|
||
|
}, time.Second*5, 10*time.Millisecond)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
expectSend := func(expect []abi.SectorNumber) action {
|
||
|
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||
|
s.EXPECT().StateMinerInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(miner.MinerInfo{Owner: t0123, Worker: t0123}, nil)
|
||
|
|
||
|
ti := len(expect)
|
||
|
batch := false
|
||
|
if ti >= minBatch {
|
||
|
batch = true
|
||
|
ti = 1
|
||
|
}
|
||
|
s.EXPECT().ChainHead(gomock.Any()).Return(nil, abi.ChainEpoch(1), nil)
|
||
|
s.EXPECT().StateSectorPreCommitInfo(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&miner.SectorPreCommitOnChainInfo{
|
||
|
PreCommitDeposit: big.Zero(),
|
||
|
}, nil).Times(len(expect))
|
||
|
s.EXPECT().StateMinerInitialPledgeCollateral(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(big.Zero(), nil).Times(len(expect))
|
||
|
if batch {
|
||
|
s.EXPECT().StateNetworkVersion(gomock.Any(), gomock.Any()).Return(network.Version13, nil)
|
||
|
s.EXPECT().ChainBaseFee(gomock.Any(), gomock.Any()).Return(big.NewInt(2000), nil)
|
||
|
}
|
||
|
|
||
|
s.EXPECT().SendMsg(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), funMatcher(func(i interface{}) bool {
|
||
|
b := i.([]byte)
|
||
|
if batch {
|
||
|
var params miner5.ProveCommitAggregateParams
|
||
|
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
|
||
|
for _, number := range expect {
|
||
|
set, err := params.SectorNumbers.IsSet(uint64(number))
|
||
|
require.NoError(t, err)
|
||
|
require.True(t, set)
|
||
|
}
|
||
|
} else {
|
||
|
var params miner5.ProveCommitSectorParams
|
||
|
require.NoError(t, params.UnmarshalCBOR(bytes.NewReader(b)))
|
||
|
}
|
||
|
return true
|
||
|
})).Times(ti)
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
flush := func(expect []abi.SectorNumber) action {
|
||
|
return func(t *testing.T, s *mocks.MockCommitBatcherApi, pcb *sealing.CommitBatcher) promise {
|
||
|
_ = expectSend(expect)(t, s, pcb)
|
||
|
|
||
|
batch := len(expect) >= minBatch
|
||
|
|
||
|
r, err := pcb.Flush(ctx)
|
||
|
require.NoError(t, err)
|
||
|
if batch {
|
||
|
require.Len(t, r, 1)
|
||
|
require.Empty(t, r[0].Error)
|
||
|
sort.Slice(r[0].Sectors, func(i, j int) bool {
|
||
|
return r[0].Sectors[i] < r[0].Sectors[j]
|
||
|
})
|
||
|
require.Equal(t, expect, r[0].Sectors)
|
||
|
} else {
|
||
|
require.Len(t, r, len(expect))
|
||
|
for _, res := range r {
|
||
|
require.Len(t, res.Sectors, 1)
|
||
|
require.Empty(t, res.Error)
|
||
|
}
|
||
|
sort.Slice(r, func(i, j int) bool {
|
||
|
return r[i].Sectors[0] < r[j].Sectors[0]
|
||
|
})
|
||
|
for i, res := range r {
|
||
|
require.Equal(t, abi.SectorNumber(i), res.Sectors[0])
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
getSectors := func(n int) []abi.SectorNumber {
|
||
|
out := make([]abi.SectorNumber, n)
|
||
|
for i := range out {
|
||
|
out[i] = abi.SectorNumber(i)
|
||
|
}
|
||
|
return out
|
||
|
}
|
||
|
|
||
|
tcs := map[string]struct {
|
||
|
actions []action
|
||
|
}{
|
||
|
"addSingle": {
|
||
|
actions: []action{
|
||
|
addSector(0),
|
||
|
waitPending(1),
|
||
|
flush([]abi.SectorNumber{0}),
|
||
|
},
|
||
|
},
|
||
|
"addTwo": {
|
||
|
actions: []action{
|
||
|
addSectors(getSectors(2)),
|
||
|
waitPending(2),
|
||
|
flush(getSectors(2)),
|
||
|
},
|
||
|
},
|
||
|
"addAte": {
|
||
|
actions: []action{
|
||
|
addSectors(getSectors(8)),
|
||
|
waitPending(8),
|
||
|
flush(getSectors(8)),
|
||
|
},
|
||
|
},
|
||
|
"addMax": {
|
||
|
actions: []action{
|
||
|
expectSend(getSectors(maxBatch)),
|
||
|
addSectors(getSectors(maxBatch)),
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
|
||
|
for name, tc := range tcs {
|
||
|
tc := tc
|
||
|
|
||
|
t.Run(name, func(t *testing.T) {
|
||
|
// create go mock controller here
|
||
|
mockCtrl := gomock.NewController(t)
|
||
|
// when test is done, assert expectations on all mock objects.
|
||
|
defer mockCtrl.Finish()
|
||
|
|
||
|
// create them mocks
|
||
|
pcapi := mocks.NewMockCommitBatcherApi(mockCtrl)
|
||
|
|
||
|
pcb := sealing.NewCommitBatcher(ctx, t0123, pcapi, as, fc, cfg, &fakeProver{})
|
||
|
|
||
|
var promises []promise
|
||
|
|
||
|
for _, a := range tc.actions {
|
||
|
p := a(t, pcapi, pcb)
|
||
|
if p != nil {
|
||
|
promises = append(promises, p)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
for _, p := range promises {
|
||
|
p(t)
|
||
|
}
|
||
|
|
||
|
err := pcb.Stop(ctx)
|
||
|
require.NoError(t, err)
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type fakeProver struct{}
|
||
|
|
||
|
func (f fakeProver) AggregateSealProofs(aggregateInfo proof5.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error) {
|
||
|
return []byte("Trust me, I'm a proof"), nil
|
||
|
}
|
||
|
|
||
|
var _ ffiwrapper.Prover = &fakeProver{}
|