markets: Fix offset in LocatePieceForDealWithinSector

This commit is contained in:
Łukasz Magiera 2020-07-28 01:22:20 +02:00
parent d557c407c6
commit 10362ab9b4
7 changed files with 94 additions and 8 deletions

View File

@ -137,7 +137,7 @@ type SectorInfo struct {
type SealedRef struct { type SealedRef struct {
SectorID abi.SectorNumber SectorID abi.SectorNumber
Offset uint64 Offset abi.PaddedPieceSize
Size abi.UnpaddedPieceSize Size abi.UnpaddedPieceSize
} }

View File

@ -317,7 +317,7 @@ func (t *SealedRef) UnmarshalCBOR(r io.Reader) error {
if maj != cbg.MajUnsignedInt { if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field") return fmt.Errorf("wrong type for uint64 field")
} }
t.Offset = uint64(extra) t.Offset = abi.PaddedPieceSize(extra)
} }
// t.Size (abi.UnpaddedPieceSize) (uint64) // t.Size (abi.UnpaddedPieceSize) (uint64)

View File

@ -129,7 +129,7 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second) time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal) waitDealSealed(t, ctx, miner, client, deal, false)
// Retrieval // Retrieval
info, err := client.ClientGetDealInfo(ctx, *deal) info, err := client.ClientGetDealInfo(ctx, *deal)
@ -138,6 +138,83 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod
testRetrieval(t, ctx, err, client, fcid, &info.PieceCID, carExport, data) testRetrieval(t, ctx, err, client, fcid, &info.PieceCID, carExport, data)
} }
func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) {
_ = os.Setenv("BELLMAN_NO_GPU", "1")
ctx := context.Background()
n, sn := b(t, 1, oneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]
addrinfo, err := client.NetAddrsListen(ctx)
if err != nil {
t.Fatal(err)
}
if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
mine := int64(1)
done := make(chan struct{})
go func() {
defer close(done)
for atomic.LoadInt64(&mine) == 1 {
time.Sleep(blocktime)
if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil {
t.Error(err)
}
}
}()
{
data1 := make([]byte, 800)
rand.New(rand.NewSource(int64(3))).Read(data1)
r := bytes.NewReader(data1)
fcid1, err := client.ClientImportLocal(ctx, r)
if err != nil {
t.Fatal(err)
}
data2 := make([]byte, 800)
rand.New(rand.NewSource(int64(9))).Read(data2)
r2 := bytes.NewReader(data2)
fcid2, err := client.ClientImportLocal(ctx, r2)
if err != nil {
t.Fatal(err)
}
deal1 := startDeal(t, ctx, miner, client, fcid1, true)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal1, true)
deal2 := startDeal(t, ctx, miner, client, fcid2, true)
time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal2, false)
// Retrieval
info, err := client.ClientGetDealInfo(ctx, *deal2)
require.NoError(t, err)
rf, _ := miner.SectorsRefs(ctx)
fmt.Printf("refs: %+v\n", rf)
testRetrieval(t, ctx, err, client, fcid2, &info.PieceCID, false, data2)
}
atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
}
func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, fcid cid.Cid, fastRet bool) *cid.Cid { func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, fcid cid.Cid, fastRet bool) *cid.Cid {
maddr, err := miner.ActorAddress(ctx) maddr, err := miner.ActorAddress(ctx)
if err != nil { if err != nil {
@ -162,7 +239,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client
return deal return deal
} }
func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, deal *cid.Cid) { func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, deal *cid.Cid, noseal bool) {
loop: loop:
for { for {
di, err := client.ClientGetDealInfo(ctx, *deal) di, err := client.ClientGetDealInfo(ctx, *deal)
@ -171,6 +248,9 @@ loop:
} }
switch di.State { switch di.State {
case storagemarket.StorageDealSealing: case storagemarket.StorageDealSealing:
if noseal {
return
}
startSealingWaiting(t, ctx, miner) startSealingWaiting(t, ctx, miner)
case storagemarket.StorageDealProposalRejected: case storagemarket.StorageDealProposalRejected:
t.Fatal("deal rejected") t.Fatal("deal rejected")

View File

@ -194,7 +194,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second) time.Sleep(time.Second)
waitDealSealed(t, ctx, provider, client, deal) waitDealSealed(t, ctx, provider, client, deal, false)
<-minedTwo <-minedTwo

View File

@ -225,7 +225,7 @@ func (n *ProviderNodeAdapter) LocatePieceForDealWithinSector(ctx context.Context
if bestSi.State == sealing.UndefinedSectorState { if bestSi.State == sealing.UndefinedSectorState {
return 0, 0, 0, xerrors.New("no sealed sector found") return 0, 0, 0, xerrors.New("no sealed sector found")
} }
return uint64(best.SectorID), best.Offset, uint64(best.Size), nil return uint64(best.SectorID), uint64(best.Offset.Unpadded()), uint64(best.Size), nil
} }
func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, cb storagemarket.DealSectorCommittedCallback) error { func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, cb storagemarket.DealSectorCommittedCallback) error {

View File

@ -491,6 +491,8 @@ func TestAPIDealFlowReal(t *testing.T) {
logging.SetLogLevel("sub", "ERROR") logging.SetLogLevel("sub", "ERROR")
logging.SetLogLevel("storageminer", "ERROR") logging.SetLogLevel("storageminer", "ERROR")
saminer.PreCommitChallengeDelay = 5
t.Run("basic", func(t *testing.T) { t.Run("basic", func(t *testing.T) {
test.TestDealFlow(t, builder, time.Second, false, false) test.TestDealFlow(t, builder, time.Second, false, false)
}) })
@ -498,6 +500,10 @@ func TestAPIDealFlowReal(t *testing.T) {
t.Run("fast-retrieval", func(t *testing.T) { t.Run("fast-retrieval", func(t *testing.T) {
test.TestDealFlow(t, builder, time.Second, false, true) test.TestDealFlow(t, builder, time.Second, false, true)
}) })
t.Run("retrieval-second", func(t *testing.T) {
test.TestSenondDealRetrieval(t, builder, time.Second)
})
} }
func TestDealMining(t *testing.T) { func TestDealMining(t *testing.T) {

View File

@ -64,7 +64,7 @@ func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS) *SectorBlocks {
return sbc return sbc
} }
func (st *SectorBlocks) writeRef(dealID abi.DealID, sectorID abi.SectorNumber, offset uint64, size abi.UnpaddedPieceSize) error { func (st *SectorBlocks) writeRef(dealID abi.DealID, sectorID abi.SectorNumber, offset abi.PaddedPieceSize, size abi.UnpaddedPieceSize) error {
st.keyLk.Lock() // TODO: make this multithreaded st.keyLk.Lock() // TODO: make this multithreaded
defer st.keyLk.Unlock() defer st.keyLk.Unlock()
@ -102,7 +102,7 @@ func (st *SectorBlocks) AddPiece(ctx context.Context, size abi.UnpaddedPieceSize
return 0, err return 0, err
} }
err = st.writeRef(d.DealID, sn, offset, size) err = st.writeRef(d.DealID, sn, abi.PaddedPieceSize(offset), size)
if err != nil { if err != nil {
return 0, xerrors.Errorf("writeRef: %w", err) return 0, xerrors.Errorf("writeRef: %w", err)
} }