adding TestDealWithMarketAndMinerNode
This commit is contained in:
parent
99646d209e
commit
21b51328f9
2
extern/sector-storage/stores/remote.go
vendored
2
extern/sector-storage/stores/remote.go
vendored
@ -53,6 +53,7 @@ func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storifa
|
||||
}
|
||||
|
||||
func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler partialFileHandler) *Remote {
|
||||
fmt.Printf("Creating NewRemote: %#v \n", auth)
|
||||
return &Remote{
|
||||
local: local,
|
||||
index: index,
|
||||
@ -304,6 +305,7 @@ func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.Registe
|
||||
return false, xerrors.Errorf("request: %w", err)
|
||||
}
|
||||
req.Header = r.auth.Clone()
|
||||
fmt.Printf("req using header: %#v \n", r.auth)
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
|
@ -24,6 +24,51 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func TestDealWithMarketAndMinerNode(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode")
|
||||
}
|
||||
|
||||
kit.QuietMiningLogs()
|
||||
|
||||
oldDelay := policy.GetPreCommitChallengeDelay()
|
||||
policy.SetPreCommitChallengeDelay(5)
|
||||
t.Cleanup(func() {
|
||||
policy.SetPreCommitChallengeDelay(oldDelay)
|
||||
})
|
||||
|
||||
// For these tests where the block time is artificially short, just use
|
||||
// a deal start epoch that is guaranteed to be far enough in the future
|
||||
// so that the deal starts sealing in time
|
||||
startEpoch := abi.ChainEpoch(2 << 12)
|
||||
|
||||
runTest := func(t *testing.T, n int, fastRetrieval bool, carExport bool) {
|
||||
api.RunningNodeType = api.NodeMiner // TODO(anteva): fix me
|
||||
|
||||
client, main, market, _ := kit.EnsembleWithMinerAndMarketNodes(t, kit.ThroughRPC())
|
||||
|
||||
dh := kit.NewDealHarness(t, client, main, market)
|
||||
|
||||
runConcurrentDeals(t, dh, fullDealCyclesOpts{
|
||||
n: n,
|
||||
fastRetrieval: fastRetrieval,
|
||||
carExport: carExport,
|
||||
startEpoch: startEpoch,
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: add 2, 4, 8, more when this graphsync issue is fixed: https://github.com/ipfs/go-graphsync/issues/175#
|
||||
cycles := []int{1}
|
||||
for _, n := range cycles {
|
||||
n := n
|
||||
ns := fmt.Sprintf("%d", n)
|
||||
t.Run(ns+"-fastretrieval-CAR", func(t *testing.T) { runTest(t, n, true, true) })
|
||||
//t.Run(ns+"-fastretrieval-NoCAR", func(t *testing.T) { runTest(t, n, true, false) })
|
||||
//t.Run(ns+"-stdretrieval-CAR", func(t *testing.T) { runTest(t, n, true, false) })
|
||||
//t.Run(ns+"-stdretrieval-NoCAR", func(t *testing.T) { runTest(t, n, false, false) })
|
||||
}
|
||||
}
|
||||
|
||||
func TestDealCyclesConcurrent(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping test in short mode")
|
||||
|
@ -270,7 +270,7 @@ func startNodes(
|
||||
handler, err := gateway.Handler(gwapi)
|
||||
require.NoError(t, err)
|
||||
|
||||
srv, _ := kit.CreateRPCServer(t, handler)
|
||||
srv, _ := kit.CreateRPCServer(t, handler, nil)
|
||||
|
||||
// Create a gateway client API that connects to the gateway server
|
||||
var gapi api.Gateway
|
||||
|
@ -432,11 +432,14 @@ func (n *Ensemble) Start() *Ensemble {
|
||||
cfg.Subsystems.EnableSectorStorage = m.options.subsystems.Has(SSectorStorage)
|
||||
|
||||
if m.options.mainMiner != nil {
|
||||
token, err := m.options.mainMiner.FullNode.AuthNew(ctx, api.AllPermissions[:3])
|
||||
token, err := m.options.mainMiner.FullNode.AuthNew(ctx, api.AllPermissions[:4])
|
||||
require.NoError(n.t, err)
|
||||
|
||||
cfg.Subsystems.SectorIndexApiInfo = fmt.Sprintf("%s:%s", token, m.options.mainMiner.ListenAddr)
|
||||
cfg.Subsystems.SealerApiInfo = fmt.Sprintf("%s:%s", token, m.options.mainMiner.ListenAddr)
|
||||
|
||||
fmt.Println("config for market node, setting SectorIndexApiInfo to: ", cfg.Subsystems.SectorIndexApiInfo)
|
||||
fmt.Println("config for market node, setting SealerApiInfo to: ", cfg.Subsystems.SealerApiInfo)
|
||||
}
|
||||
|
||||
err = lr.SetConfig(func(raw interface{}) {
|
||||
|
@ -23,7 +23,7 @@ func EnsembleMinimal(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMin
|
||||
return &full, &miner, ens
|
||||
}
|
||||
|
||||
func EnsembleWithMarket(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMiner, *TestMiner, *Ensemble) {
|
||||
func EnsembleWithMinerAndMarketNodes(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMiner, *TestMiner, *Ensemble) {
|
||||
eopts, nopts := siftOptions(t, opts)
|
||||
|
||||
var (
|
||||
|
@ -2,6 +2,8 @@ package kit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
@ -13,8 +15,11 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func CreateRPCServer(t *testing.T, handler http.Handler) (*httptest.Server, multiaddr.Multiaddr) {
|
||||
func CreateRPCServer(t *testing.T, handler http.Handler, listener net.Listener) (*httptest.Server, multiaddr.Multiaddr) {
|
||||
testServ := httptest.NewServer(handler)
|
||||
if listener != nil {
|
||||
testServ.Listener = listener
|
||||
}
|
||||
t.Cleanup(testServ.Close)
|
||||
t.Cleanup(testServ.CloseClientConnections)
|
||||
|
||||
@ -28,7 +33,7 @@ func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode {
|
||||
handler, err := node.FullNodeHandler(f.FullNode, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
srv, maddr := CreateRPCServer(t, handler)
|
||||
srv, maddr := CreateRPCServer(t, handler, nil)
|
||||
|
||||
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil)
|
||||
require.NoError(t, err)
|
||||
@ -42,9 +47,11 @@ func minerRpc(t *testing.T, m *TestMiner) *TestMiner {
|
||||
handler, err := node.MinerHandler(m.StorageMiner, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
srv, maddr := CreateRPCServer(t, handler)
|
||||
srv, maddr := CreateRPCServer(t, handler, m.RemoteListener)
|
||||
|
||||
cl, stop, err := client.NewStorageMinerRPCV0(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v0", nil)
|
||||
fmt.Println("creating RPC server for", m.ActorAddr, "at: ", srv.Listener.Addr().String())
|
||||
url := "ws://" + srv.Listener.Addr().String() + "/rpc/v0"
|
||||
cl, stop, err := client.NewStorageMinerRPCV0(context.Background(), url, nil)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(stop)
|
||||
|
||||
|
@ -33,7 +33,7 @@ func init() {
|
||||
policy.SetMinVerifiedDealSize(abi.NewStoragePower(256))
|
||||
}
|
||||
|
||||
// TestPaymentChannels does a basic test to exercise the payment channel CLI
|
||||
// TestPaymentChannelsBasic does a basic test to exercise the payment channel CLI
|
||||
// commands
|
||||
func TestPaymentChannelsBasic(t *testing.T) {
|
||||
_ = os.Setenv("BELLMAN_NO_GPU", "1")
|
||||
|
@ -680,6 +680,7 @@ func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStora
|
||||
}
|
||||
|
||||
func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.SealerConfig) *stores.Remote {
|
||||
fmt.Printf("setting RemoteStorage: %#v \n", sa) // TODO(anteva): remove me prior to merge to master
|
||||
return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{})
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user