diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index c1b51a4a4..c49dddbb4 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -53,6 +53,7 @@ func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storifa } func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler partialFileHandler) *Remote { + fmt.Printf("Creating NewRemote: %#v \n", auth) return &Remote{ local: local, index: index, @@ -304,6 +305,7 @@ func (r *Remote) checkAllocated(ctx context.Context, url string, spt abi.Registe return false, xerrors.Errorf("request: %w", err) } req.Header = r.auth.Clone() + fmt.Printf("req using header: %#v \n", r.auth) req = req.WithContext(ctx) resp, err := http.DefaultClient.Do(req) diff --git a/itests/deals_test.go b/itests/deals_test.go index 59437eccd..6a05e69b7 100644 --- a/itests/deals_test.go +++ b/itests/deals_test.go @@ -24,6 +24,51 @@ import ( "golang.org/x/sync/errgroup" ) +func TestDealWithMarketAndMinerNode(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + kit.QuietMiningLogs() + + oldDelay := policy.GetPreCommitChallengeDelay() + policy.SetPreCommitChallengeDelay(5) + t.Cleanup(func() { + policy.SetPreCommitChallengeDelay(oldDelay) + }) + + // For these tests where the block time is artificially short, just use + // a deal start epoch that is guaranteed to be far enough in the future + // so that the deal starts sealing in time + startEpoch := abi.ChainEpoch(2 << 12) + + runTest := func(t *testing.T, n int, fastRetrieval bool, carExport bool) { + api.RunningNodeType = api.NodeMiner // TODO(anteva): fix me + + client, main, market, _ := kit.EnsembleWithMinerAndMarketNodes(t, kit.ThroughRPC()) + + dh := kit.NewDealHarness(t, client, main, market) + + runConcurrentDeals(t, dh, fullDealCyclesOpts{ + n: n, + fastRetrieval: fastRetrieval, + carExport: carExport, + startEpoch: startEpoch, + }) + } + + // TODO: add 2, 4, 8, more when this graphsync issue is fixed: https://github.com/ipfs/go-graphsync/issues/175# + cycles := []int{1} + for _, n := range cycles { + n := n + ns := fmt.Sprintf("%d", n) + t.Run(ns+"-fastretrieval-CAR", func(t *testing.T) { runTest(t, n, true, true) }) + //t.Run(ns+"-fastretrieval-NoCAR", func(t *testing.T) { runTest(t, n, true, false) }) + //t.Run(ns+"-stdretrieval-CAR", func(t *testing.T) { runTest(t, n, true, false) }) + //t.Run(ns+"-stdretrieval-NoCAR", func(t *testing.T) { runTest(t, n, false, false) }) + } +} + func TestDealCyclesConcurrent(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode") diff --git a/itests/gateway_test.go b/itests/gateway_test.go index 15edc3c24..73c5c31b4 100644 --- a/itests/gateway_test.go +++ b/itests/gateway_test.go @@ -270,7 +270,7 @@ func startNodes( handler, err := gateway.Handler(gwapi) require.NoError(t, err) - srv, _ := kit.CreateRPCServer(t, handler) + srv, _ := kit.CreateRPCServer(t, handler, nil) // Create a gateway client API that connects to the gateway server var gapi api.Gateway diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 860fb5783..be6679121 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -432,11 +432,14 @@ func (n *Ensemble) Start() *Ensemble { cfg.Subsystems.EnableSectorStorage = m.options.subsystems.Has(SSectorStorage) if m.options.mainMiner != nil { - token, err := m.options.mainMiner.FullNode.AuthNew(ctx, api.AllPermissions[:3]) + token, err := m.options.mainMiner.FullNode.AuthNew(ctx, api.AllPermissions[:4]) require.NoError(n.t, err) cfg.Subsystems.SectorIndexApiInfo = fmt.Sprintf("%s:%s", token, m.options.mainMiner.ListenAddr) cfg.Subsystems.SealerApiInfo = fmt.Sprintf("%s:%s", token, m.options.mainMiner.ListenAddr) + + fmt.Println("config for market node, setting SectorIndexApiInfo to: ", cfg.Subsystems.SectorIndexApiInfo) + fmt.Println("config for market node, setting SealerApiInfo to: ", cfg.Subsystems.SealerApiInfo) } err = lr.SetConfig(func(raw interface{}) { diff --git a/itests/kit/ensemble_presets.go b/itests/kit/ensemble_presets.go index bc287770b..afc7c17f2 100644 --- a/itests/kit/ensemble_presets.go +++ b/itests/kit/ensemble_presets.go @@ -23,7 +23,7 @@ func EnsembleMinimal(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMin return &full, &miner, ens } -func EnsembleWithMarket(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMiner, *TestMiner, *Ensemble) { +func EnsembleWithMinerAndMarketNodes(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMiner, *TestMiner, *Ensemble) { eopts, nopts := siftOptions(t, opts) var ( diff --git a/itests/kit/rpc.go b/itests/kit/rpc.go index dab45df07..9b5951af6 100644 --- a/itests/kit/rpc.go +++ b/itests/kit/rpc.go @@ -2,6 +2,8 @@ package kit import ( "context" + "fmt" + "net" "net/http" "net/http/httptest" "testing" @@ -13,8 +15,11 @@ import ( "github.com/stretchr/testify/require" ) -func CreateRPCServer(t *testing.T, handler http.Handler) (*httptest.Server, multiaddr.Multiaddr) { +func CreateRPCServer(t *testing.T, handler http.Handler, listener net.Listener) (*httptest.Server, multiaddr.Multiaddr) { testServ := httptest.NewServer(handler) + if listener != nil { + testServ.Listener = listener + } t.Cleanup(testServ.Close) t.Cleanup(testServ.CloseClientConnections) @@ -28,7 +33,7 @@ func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode { handler, err := node.FullNodeHandler(f.FullNode, false) require.NoError(t, err) - srv, maddr := CreateRPCServer(t, handler) + srv, maddr := CreateRPCServer(t, handler, nil) cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil) require.NoError(t, err) @@ -42,9 +47,11 @@ func minerRpc(t *testing.T, m *TestMiner) *TestMiner { handler, err := node.MinerHandler(m.StorageMiner, false) require.NoError(t, err) - srv, maddr := CreateRPCServer(t, handler) + srv, maddr := CreateRPCServer(t, handler, m.RemoteListener) - cl, stop, err := client.NewStorageMinerRPCV0(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v0", nil) + fmt.Println("creating RPC server for", m.ActorAddr, "at: ", srv.Listener.Addr().String()) + url := "ws://" + srv.Listener.Addr().String() + "/rpc/v0" + cl, stop, err := client.NewStorageMinerRPCV0(context.Background(), url, nil) require.NoError(t, err) t.Cleanup(stop) diff --git a/itests/paych_cli_test.go b/itests/paych_cli_test.go index 495f33ef2..91b7e305d 100644 --- a/itests/paych_cli_test.go +++ b/itests/paych_cli_test.go @@ -33,7 +33,7 @@ func init() { policy.SetMinVerifiedDealSize(abi.NewStoragePower(256)) } -// TestPaymentChannels does a basic test to exercise the payment channel CLI +// TestPaymentChannelsBasic does a basic test to exercise the payment channel CLI // commands func TestPaymentChannelsBasic(t *testing.T) { _ = os.Setenv("BELLMAN_NO_GPU", "1") diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 7d36670af..b63281e67 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -680,6 +680,7 @@ func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStora } func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sectorstorage.StorageAuth, sc sectorstorage.SealerConfig) *stores.Remote { + fmt.Printf("setting RemoteStorage: %#v \n", sa) // TODO(anteva): remove me prior to merge to master return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{}) }