netbs: Add an integration test

This commit is contained in:
Łukasz Magiera 2022-11-07 15:14:56 +00:00
parent 53e43a402a
commit 888f97a35f
7 changed files with 127 additions and 3 deletions

View File

@ -896,6 +896,11 @@ workflows:
suite: itest-deals_publish
target: "./itests/deals_publish_test.go"
- test:
name: test-itest-deals_remote_retrieval
suite: itest-deals_remote_retrieval
target: "./itests/deals_remote_retrieval_test.go"
- test:
name: test-itest-deals_retry_deal_no_funds
suite: itest-deals_retry_deal_no_funds

Binary file not shown.

2
go.mod
View File

@ -63,6 +63,7 @@ require (
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.7.4
github.com/gorilla/websocket v1.5.0
github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/hashicorp/go-multierror v1.1.1
@ -214,7 +215,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/huin/goupnp v1.0.3 // indirect

View File

@ -0,0 +1,104 @@
package itests
import (
"bytes"
"context"
"fmt"
"io"
"net/url"
"os"
"path"
"testing"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/ipld/go-car"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/itests/kit"
)
func TestNetStoreRetrieval(t *testing.T) {
kit.QuietMiningLogs()
blocktime := 5 * time.Millisecond
ctx := context.Background()
full, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blocktime)
time.Sleep(5 * time.Second)
// For these tests where the block time is artificially short, just use
// a deal start epoch that is guaranteed to be far enough in the future
// so that the deal starts sealing in time
dealStartEpoch := abi.ChainEpoch(2 << 12)
rseed := 7
dh := kit.NewDealHarness(t, full, miner, miner)
dealCid, res, _ := dh.MakeOnlineDeal(context.Background(), kit.MakeFullDealParams{
Rseed: rseed,
StartEpoch: dealStartEpoch,
UseCARFileForStorageDeal: true,
})
// create deal store
id := uuid.New()
rstore := bstore.NewMemorySync()
au, err := url.Parse(full.ListenURL)
require.NoError(t, err)
switch au.Scheme {
case "http":
au.Scheme = "ws"
case "https":
au.Scheme = "wss"
}
au.Path = path.Join(au.Path, "/rest/v0/store/"+id.String())
conn, _, err := websocket.DefaultDialer.Dial(au.String(), nil)
require.NoError(t, err)
_ = bstore.HandleNetBstoreWS(ctx, rstore, conn)
dh.PerformRetrievalWithOrder(ctx, dealCid, res.Root, false, func(offer api.QueryOffer, address address.Address) api.RetrievalOrder {
order := offer.Order(address)
order.RemoteStore = &id
return order
})
// check blockstore blocks
carv1FilePath, _ := kit.CreateRandomCARv1(t, rseed, 200)
cb, err := os.ReadFile(carv1FilePath)
require.NoError(t, err)
cr, err := car.NewCarReader(bytes.NewReader(cb))
require.NoError(t, err)
var blocks int
for {
cb, err := cr.Next()
if err == io.EOF {
fmt.Println("blocks: ", blocks)
return
}
require.NoError(t, err)
sb, err := rstore.Get(ctx, cb.Cid())
require.NoError(t, err)
require.EqualValues(t, cb.RawData(), sb.RawData())
blocks++
}
}

View File

@ -19,6 +19,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/go-fil-markets/storagemarket"
@ -308,6 +309,12 @@ func (dh *DealHarness) StartSealingWaiting(ctx context.Context) {
}
func (dh *DealHarness) PerformRetrieval(ctx context.Context, deal *cid.Cid, root cid.Cid, carExport bool, offers ...api.QueryOffer) (path string) {
return dh.PerformRetrievalWithOrder(ctx, deal, root, carExport, func(offer api.QueryOffer, a address.Address) api.RetrievalOrder {
return offer.Order(a)
}, offers...)
}
func (dh *DealHarness) PerformRetrievalWithOrder(ctx context.Context, deal *cid.Cid, root cid.Cid, carExport bool, makeOrder func(api.QueryOffer, address.Address) api.RetrievalOrder, offers ...api.QueryOffer) (path string) {
var offer api.QueryOffer
if len(offers) == 0 {
// perform retrieval.
@ -331,7 +338,9 @@ func (dh *DealHarness) PerformRetrieval(ctx context.Context, deal *cid.Cid, root
updates, err := dh.client.ClientGetRetrievalUpdates(updatesCtx)
require.NoError(dh.t, err)
retrievalRes, err := dh.client.ClientRetrieve(ctx, offer.Order(caddr))
order := makeOrder(offer, caddr)
retrievalRes, err := dh.client.ClientRetrieve(ctx, order)
require.NoError(dh.t, err)
consumeEvents:
for {
@ -357,6 +366,11 @@ consumeEvents:
}
cancel()
if order.RemoteStore != nil {
// if we're retrieving into a remote store, skip export
return ""
}
require.NoError(dh.t, dh.client.ClientExport(ctx,
api.ExportRef{
Root: root,

View File

@ -27,6 +27,7 @@ type TestFullNode struct {
// ListenAddr is the address on which an API server is listening, if an
// API server is created for this Node.
ListenAddr multiaddr.Multiaddr
ListenURL string
DefaultKey *key.Key
options nodeOpts

View File

@ -65,7 +65,7 @@ func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode {
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil)
require.NoError(t, err)
t.Cleanup(stop)
f.ListenAddr, f.FullNode = maddr, cl
f.ListenAddr, f.ListenURL, f.FullNode = maddr, srv.URL, cl
return f
}