lotus/markets/dagstore/lotusaccessor_test.go

226 lines
5.9 KiB
Go

package dagstore
import (
"bytes"
"context"
"io"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/filecoin-project/go-fil-markets/piecestore"
piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
)
const unsealedSectorID = abi.SectorNumber(1)
const sealedSectorID = abi.SectorNumber(2)
func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
ctx := context.Background()
cid1, err := cid.Parse("bafkqaaa")
require.NoError(t, err)
unsealedSectorData := "unsealed"
sealedSectorData := "sealed"
mockData := map[abi.SectorNumber]string{
unsealedSectorID: unsealedSectorData,
sealedSectorID: sealedSectorData,
}
testCases := []struct {
name string
deals []abi.SectorNumber
fetchedData string
expectErr bool
}{{
// Expect error if there is no deal info for piece CID
name: "no deals",
expectErr: true,
}, {
// Expect the API to always fetch the unsealed deal (because it's
// cheaper than fetching the sealed deal)
name: "prefer unsealed deal",
deals: []abi.SectorNumber{unsealedSectorID, sealedSectorID},
fetchedData: unsealedSectorData,
}, {
// Expect the API to unseal the data if there are no unsealed deals
name: "unseal if necessary",
deals: []abi.SectorNumber{sealedSectorID},
fetchedData: sealedSectorData,
}}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
ps := getPieceStore(t)
rpn := &mockRPN{
sectors: mockData,
}
api := NewLotusAccessor(ps, rpn)
require.NoError(t, api.Start(ctx))
// Add deals to piece store
for _, sectorID := range tc.deals {
dealInfo := piecestore.DealInfo{
SectorID: sectorID,
}
err = ps.AddDealForPiece(cid1, dealInfo)
require.NoError(t, err)
}
// Fetch the piece
r, err := api.FetchUnsealedPiece(ctx, cid1)
if tc.expectErr {
require.Error(t, err)
return
}
// Check that the returned reader is for the correct piece
require.NoError(t, err)
bz, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, tc.fetchedData, string(bz))
})
}
}
func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
ctx := context.Background()
cid1, err := cid.Parse("bafkqaaa")
require.NoError(t, err)
ps := getPieceStore(t)
rpn := &mockRPN{}
api := NewLotusAccessor(ps, rpn)
require.NoError(t, api.Start(ctx))
// Add a deal with data Length 10
dealInfo := piecestore.DealInfo{
Length: 10,
}
err = ps.AddDealForPiece(cid1, dealInfo)
require.NoError(t, err)
// Check that the data length is correct
len, err := api.GetUnpaddedCARSize(ctx, cid1)
require.NoError(t, err)
require.EqualValues(t, 10, len)
}
func TestThrottle(t *testing.T) {
MaxConcurrentStorageCalls = 3
ctx := context.Background()
cid1, err := cid.Parse("bafkqaaa")
require.NoError(t, err)
ps := getPieceStore(t)
rpn := &mockRPN{
sectors: map[abi.SectorNumber]string{
unsealedSectorID: "foo",
},
}
api := NewLotusAccessor(ps, rpn)
require.NoError(t, api.Start(ctx))
// Add a deal with data Length 10
dealInfo := piecestore.DealInfo{
SectorID: unsealedSectorID,
Length: 10,
}
err = ps.AddDealForPiece(cid1, dealInfo)
require.NoError(t, err)
// hold the lock to block.
rpn.lk.Lock()
// fetch the piece concurrently.
errgrp, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 10; i++ {
errgrp.Go(func() error {
r, err := api.FetchUnsealedPiece(ctx, cid1)
if err == nil {
_ = r.Close()
}
return err
})
}
time.Sleep(500 * time.Millisecond)
require.EqualValues(t, MaxConcurrentStorageCalls, atomic.LoadInt32(&rpn.calls)) // throttled
// allow to proceed.
rpn.lk.Unlock()
// allow all to finish.
err = errgrp.Wait()
require.NoError(t, err)
require.EqualValues(t, 10, atomic.LoadInt32(&rpn.calls)) // throttled
}
func getPieceStore(t *testing.T) piecestore.PieceStore {
ps, err := piecestoreimpl.NewPieceStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)
err = ps.Start(context.Background())
require.NoError(t, err)
return ps
}
type mockRPN struct {
calls int32 // guarded by atomic
lk sync.RWMutex // lock to simulate blocks.
sectors map[abi.SectorNumber]string
}
func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
atomic.AddInt32(&m.calls, 1)
m.lk.RLock()
defer m.lk.RUnlock()
data, ok := m.sectors[sectorID]
if !ok {
panic("sector not found")
}
return io.NopCloser(bytes.NewBuffer([]byte(data))), nil
}
func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
return sectorID == unsealedSectorID, nil
}
func (m *mockRPN) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
panic("implement me")
}
func (m *mockRPN) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) {
panic("implement me")
}
func (m *mockRPN) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount, tok shared.TipSetToken) (abi.TokenAmount, error) {
panic("implement me")
}
func (m *mockRPN) GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (retrievalmarket.PricingInput, error) {
panic("implement me")
}
var _ retrievalmarket.RetrievalProviderNode = (*mockRPN)(nil)