diff --git a/api/api_storage.go b/api/api_storage.go index b1c106c00..502ebeab2 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -137,7 +137,7 @@ type SectorInfo struct { type SealedRef struct { SectorID abi.SectorNumber - Offset uint64 + Offset abi.PaddedPieceSize Size abi.UnpaddedPieceSize } diff --git a/api/cbor_gen.go b/api/cbor_gen.go index f4d06d917..3414aaf8c 100644 --- a/api/cbor_gen.go +++ b/api/cbor_gen.go @@ -317,7 +317,7 @@ func (t *SealedRef) UnmarshalCBOR(r io.Reader) error { if maj != cbg.MajUnsignedInt { return fmt.Errorf("wrong type for uint64 field") } - t.Offset = uint64(extra) + t.Offset = abi.PaddedPieceSize(extra) } // t.Size (abi.UnpaddedPieceSize) (uint64) diff --git a/api/test/deals.go b/api/test/deals.go index 31224ecd2..3b274977d 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -129,7 +129,7 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) - waitDealSealed(t, ctx, miner, client, deal) + waitDealSealed(t, ctx, miner, client, deal, false) // Retrieval info, err := client.ClientGetDealInfo(ctx, *deal) @@ -138,6 +138,83 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod testRetrieval(t, ctx, err, client, fcid, &info.PieceCID, carExport, data) } +func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) { + _ = os.Setenv("BELLMAN_NO_GPU", "1") + + ctx := context.Background() + n, sn := b(t, 1, oneMiner) + client := n[0].FullNode.(*impl.FullNodeAPI) + miner := sn[0] + + addrinfo, err := client.NetAddrsListen(ctx) + if err != nil { + t.Fatal(err) + } + + if err := miner.NetConnect(ctx, addrinfo); err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + + mine := int64(1) + done := make(chan struct{}) + + go func() { + defer close(done) + for atomic.LoadInt64(&mine) == 1 { + time.Sleep(blocktime) + if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil { + t.Error(err) + } + } + }() + + { + data1 := make([]byte, 800) + rand.New(rand.NewSource(int64(3))).Read(data1) + r := bytes.NewReader(data1) + + + fcid1, err := client.ClientImportLocal(ctx, r) + if err != nil { + t.Fatal(err) + } + + data2 := make([]byte, 800) + rand.New(rand.NewSource(int64(9))).Read(data2) + r2 := bytes.NewReader(data2) + + fcid2, err := client.ClientImportLocal(ctx, r2) + if err != nil { + t.Fatal(err) + } + + deal1 := startDeal(t, ctx, miner, client, fcid1, true) + + // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this + time.Sleep(time.Second) + waitDealSealed(t, ctx, miner, client, deal1, true) + + deal2 := startDeal(t, ctx, miner, client, fcid2, true) + + time.Sleep(time.Second) + waitDealSealed(t, ctx, miner, client, deal2, false) + + // Retrieval + info, err := client.ClientGetDealInfo(ctx, *deal2) + require.NoError(t, err) + + rf, _ := miner.SectorsRefs(ctx) + fmt.Printf("refs: %+v\n", rf) + + testRetrieval(t, ctx, err, client, fcid2, &info.PieceCID, false, data2) + } + + atomic.AddInt64(&mine, -1) + fmt.Println("shutting down mining") + <-done +} + func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, fcid cid.Cid, fastRet bool) *cid.Cid { maddr, err := miner.ActorAddress(ctx) if err != nil { @@ -162,7 +239,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client return deal } -func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, deal *cid.Cid) { +func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, deal *cid.Cid, noseal bool) { loop: for { di, err := client.ClientGetDealInfo(ctx, *deal) @@ -171,6 +248,9 @@ loop: } switch di.State { case storagemarket.StorageDealSealing: + if noseal { + return + } startSealingWaiting(t, ctx, miner) case storagemarket.StorageDealProposalRejected: t.Fatal("deal rejected") diff --git a/api/test/mining.go b/api/test/mining.go index dcbd59dd1..d5c9ab6e7 100644 --- a/api/test/mining.go +++ b/api/test/mining.go @@ -194,7 +194,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) - waitDealSealed(t, ctx, provider, client, deal) + waitDealSealed(t, ctx, provider, client, deal, false) <-minedTwo diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 968ced4cf..86419df42 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -225,7 +225,7 @@ func (n *ProviderNodeAdapter) LocatePieceForDealWithinSector(ctx context.Context if bestSi.State == sealing.UndefinedSectorState { return 0, 0, 0, xerrors.New("no sealed sector found") } - return uint64(best.SectorID), best.Offset, uint64(best.Size), nil + return uint64(best.SectorID), uint64(best.Offset.Unpadded()), uint64(best.Size), nil } func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, cb storagemarket.DealSectorCommittedCallback) error { diff --git a/node/node_test.go b/node/node_test.go index 6e2437437..bfa154512 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -491,6 +491,8 @@ func TestAPIDealFlowReal(t *testing.T) { logging.SetLogLevel("sub", "ERROR") logging.SetLogLevel("storageminer", "ERROR") + saminer.PreCommitChallengeDelay = 5 + t.Run("basic", func(t *testing.T) { test.TestDealFlow(t, builder, time.Second, false, false) }) @@ -498,6 +500,10 @@ func TestAPIDealFlowReal(t *testing.T) { t.Run("fast-retrieval", func(t *testing.T) { test.TestDealFlow(t, builder, time.Second, false, true) }) + + t.Run("retrieval-second", func(t *testing.T) { + test.TestSenondDealRetrieval(t, builder, time.Second) + }) } func TestDealMining(t *testing.T) { diff --git a/storage/sectorblocks/blocks.go b/storage/sectorblocks/blocks.go index a09305ed2..af4fa52d2 100644 --- a/storage/sectorblocks/blocks.go +++ b/storage/sectorblocks/blocks.go @@ -64,7 +64,7 @@ func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS) *SectorBlocks { return sbc } -func (st *SectorBlocks) writeRef(dealID abi.DealID, sectorID abi.SectorNumber, offset uint64, size abi.UnpaddedPieceSize) error { +func (st *SectorBlocks) writeRef(dealID abi.DealID, sectorID abi.SectorNumber, offset abi.PaddedPieceSize, size abi.UnpaddedPieceSize) error { st.keyLk.Lock() // TODO: make this multithreaded defer st.keyLk.Unlock() @@ -102,7 +102,7 @@ func (st *SectorBlocks) AddPiece(ctx context.Context, size abi.UnpaddedPieceSize return 0, err } - err = st.writeRef(d.DealID, sn, offset, size) + err = st.writeRef(d.DealID, sn, abi.PaddedPieceSize(offset), size) if err != nil { return 0, xerrors.Errorf("writeRef: %w", err) }