// stm: #integration package itests import ( "context" "fmt" "io" "os" "testing" "time" "github.com/ipfs/go-cid" blocks "github.com/ipfs/go-libipfs/blocks" "github.com/ipld/go-car" "github.com/stretchr/testify/require" "golang.org/x/xerrors" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/itests/kit" ) // use the mainnet carfile as text fixture: it will always be here // https://dweb.link/ipfs/bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2/8/1/8/1/0/1/0 var ( sourceCar = "../build/genesis/mainnet.car" carRoot, _ = cid.Parse("bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2") carCommp, _ = cid.Parse("baga6ea4seaqmrivgzei3fmx5qxtppwankmtou6zvigyjaveu3z2zzwhysgzuina") selectedCid, _ = cid.Parse("bafkqaetgnfwc6mjpon2g64tbm5sxa33xmvza") carPieceSize = abi.PaddedPieceSize(2097152) textSelector = api.Selector("8/1/8/1/0/1/0") textSelectorNonLink = api.Selector("8/1/8/1/0/1") textSelectorNonexistent = api.Selector("42") expectedResult = "fil/1/storagepower" ) func TestPartialRetrieval(t *testing.T) { //stm: @CHAIN_SYNCER_LOAD_GENESIS_001, @CHAIN_SYNCER_FETCH_TIPSET_001, //stm: @CHAIN_SYNCER_START_001, @CHAIN_SYNCER_SYNC_001, @BLOCKCHAIN_BEACON_VALIDATE_BLOCK_VALUES_01 //stm: @CHAIN_SYNCER_COLLECT_CHAIN_001, @CHAIN_SYNCER_COLLECT_HEADERS_001, @CHAIN_SYNCER_VALIDATE_TIPSET_001 //stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001 //stm: @CHAIN_INCOMING_HANDLE_INCOMING_BLOCKS_001, @CHAIN_INCOMING_VALIDATE_BLOCK_PUBSUB_001, @CHAIN_INCOMING_VALIDATE_MESSAGE_PUBSUB_001 //stm: @CLIENT_RETRIEVAL_RETRIEVE_001 ctx := context.Background() kit.QuietMiningLogs() client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MockProofs(), kit.SectorSize(512<<20)) dh := kit.NewDealHarness(t, client, miner, miner) ens.InterconnectAll().BeginMining(50 * time.Millisecond) _, err := client.ClientImport(ctx, api.FileRef{Path: sourceCar, IsCAR: true}) require.NoError(t, err) caddr, err := client.WalletDefaultAddress(ctx) require.NoError(t, err) // first test retrieval from local car, then do an actual deal for _, exportMerkleProof := range []bool{false, true} { for _, fullCycle := range []bool{false, true} { var retOrder api.RetrievalOrder var eref api.ExportRef if !fullCycle { eref.FromLocalCAR = sourceCar } else { dp := dh.DefaultStartDealParams() dp.Data = &storagemarket.DataRef{ // FIXME: figure out how to do this with an online partial transfer TransferType: storagemarket.TTManual, Root: carRoot, PieceCid: &carCommp, PieceSize: carPieceSize.Unpadded(), } proposalCid := dh.StartDeal(ctx, dp) // Wait for the deal to reach StorageDealCheckForAcceptance on the client cd, err := client.ClientGetDealInfo(ctx, *proposalCid) require.NoError(t, err) require.Eventually(t, func() bool { cd, _ := client.ClientGetDealInfo(ctx, *proposalCid) return cd.State == storagemarket.StorageDealCheckForAcceptance }, 30*time.Second, 1*time.Second, "actual deal status is %s", storagemarket.DealStates[cd.State]) err = miner.DealsImportData(ctx, *proposalCid, sourceCar) require.NoError(t, err) // Wait for the deal to be published, we should be able to start retrieval right away dh.WaitDealPublished(ctx, proposalCid) offers, err := client.ClientFindData(ctx, carRoot, nil) require.NoError(t, err) require.NotEmpty(t, offers, "no offers") retOrder = offers[0].Order(caddr) } retOrder.DataSelector = &textSelector eref.DAGs = append(eref.DAGs, api.DagSpec{ DataSelector: &textSelector, ExportMerkleProof: exportMerkleProof, }) eref.Root = carRoot // test retrieval of either data or constructing a partial selective-car for _, retrieveAsCar := range []bool{false, true} { outFile := t.TempDir() + string(os.PathSeparator) + "ret-file" + retOrder.Root.String() require.NoError(t, testGenesisRetrieval( ctx, client, retOrder, eref, &api.FileRef{ Path: outFile, IsCAR: retrieveAsCar, }, )) // UGH if I do not sleep here, I get things like: /* retrieval failed: Retrieve failed: there is an active retrieval deal with peer 12D3KooWK9fB9a3HZ4PQLVmEQ6pweMMn5CAyKtumB71CPTnuBDi6 for payload CID bafy2bzacecnamqgqmifpluoeldx7zzglxcljo6oja4vrmtj7432rphldpdmm2 (retrieval deal ID 1631259332180384709, state DealStatusFinalizingBlockstore) - existing deal must be cancelled before starting a new retrieval deal: github.com/filecoin-project/lotus/node/impl/client.(*API).ClientRetrieve /home/circleci/project/node/impl/client/client.go:774 */ time.Sleep(time.Second) } } } // ensure non-existent paths fail require.EqualError( t, testGenesisRetrieval( ctx, client, api.RetrievalOrder{ Root: carRoot, DataSelector: &textSelectorNonexistent, }, api.ExportRef{ Root: carRoot, FromLocalCAR: sourceCar, DAGs: []api.DagSpec{{DataSelector: &textSelectorNonexistent}}, }, &api.FileRef{}, ), fmt.Sprintf("parsing dag spec: path selection does not match a node within %s", carRoot), ) // ensure non-boundary retrievals fail require.EqualError( t, testGenesisRetrieval( ctx, client, api.RetrievalOrder{ Root: carRoot, DataSelector: &textSelectorNonLink, }, api.ExportRef{ Root: carRoot, FromLocalCAR: sourceCar, DAGs: []api.DagSpec{{DataSelector: &textSelectorNonLink}}, }, &api.FileRef{}, ), fmt.Sprintf("parsing dag spec: error while locating partial retrieval sub-root: unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", textSelectorNonLink), ) } func testGenesisRetrieval(ctx context.Context, client *kit.TestFullNode, retOrder api.RetrievalOrder, eref api.ExportRef, retRef *api.FileRef) error { if retOrder.Total.Nil() { retOrder.Total = big.Zero() } if retOrder.UnsealPrice.Nil() { retOrder.UnsealPrice = big.Zero() } if eref.FromLocalCAR == "" { rr, err := client.ClientRetrieve(ctx, retOrder) if err != nil { return err } eref.DealID = rr.DealID if err := client.ClientRetrieveWait(ctx, rr.DealID); err != nil { return xerrors.Errorf("retrieval wait: %w", err) } } err := client.ClientExport(ctx, eref, *retRef) if err != nil { return err } outFile, err := os.Open(retRef.Path) if err != nil { return err } defer outFile.Close() //nolint:errcheck var data []byte if !retRef.IsCAR { data, err = io.ReadAll(outFile) if err != nil { return err } } else { cr, err := car.NewCarReader(outFile) if err != nil { return err } if len(cr.Header.Roots) != 1 { return fmt.Errorf("expected a single root in result car, got %d", len(cr.Header.Roots)) } else if eref.DAGs[0].ExportMerkleProof && cr.Header.Roots[0].String() != carRoot.String() { return fmt.Errorf("expected root cid '%s', got '%s'", carRoot.String(), cr.Header.Roots[0].String()) } else if !eref.DAGs[0].ExportMerkleProof && cr.Header.Roots[0].String() != selectedCid.String() { return fmt.Errorf("expected root cid '%s', got '%s'", selectedCid.String(), cr.Header.Roots[0].String()) } blks := make([]blocks.Block, 0) for { b, err := cr.Next() if err == io.EOF { break } else if err != nil { return err } blks = append(blks, b) } if (eref.DAGs[0].ExportMerkleProof && len(blks) != 3) || (!eref.DAGs[0].ExportMerkleProof && len(blks) != 1) { return fmt.Errorf("expected a car file with 3/1 blocks, got one with %d instead", len(blks)) } data = blks[len(blks)-1].RawData() } if string(data) != expectedResult { return fmt.Errorf("retrieved data mismatch: expected '%s' got '%s'", expectedResult, data) } return nil }