Merge pull request #2614 from filecoin-project/fix/market-offset-handling

markets: Fix offset in LocatePieceForDealWithinSector
This commit is contained in:
Łukasz Magiera 2020-07-28 01:27:59 +02:00 committed by GitHub
commit 291d2fe2de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 115 additions and 30 deletions

View File

@ -123,11 +123,11 @@ type SectorInfo struct {
// On Chain Info
SealProof abi.RegisteredSealProof // The seal proof type implies the PoSt proof/s
Activation abi.ChainEpoch // Epoch during which the sector proof was accepted
Expiration abi.ChainEpoch // Epoch during which the sector expires
DealWeight abi.DealWeight // Integral of active deals over sector lifetime
VerifiedDealWeight abi.DealWeight // Integral of active verified deals over sector lifetime
InitialPledge abi.TokenAmount // Pledge collected to commit this sector
Activation abi.ChainEpoch // Epoch during which the sector proof was accepted
Expiration abi.ChainEpoch // Epoch during which the sector expires
DealWeight abi.DealWeight // Integral of active deals over sector lifetime
VerifiedDealWeight abi.DealWeight // Integral of active verified deals over sector lifetime
InitialPledge abi.TokenAmount // Pledge collected to commit this sector
// Expiration Info
OnTime abi.ChainEpoch
// non-zero if sector is faulty, epoch at which it will be permanently
@ -137,7 +137,7 @@ type SectorInfo struct {
type SealedRef struct {
SectorID abi.SectorNumber
Offset uint64
Offset abi.PaddedPieceSize
Size abi.UnpaddedPieceSize
}

View File

@ -216,16 +216,16 @@ type StorageMinerStruct struct {
PledgeSector func(context.Context) error `perm:"write"`
SectorsStatus func(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) `perm:"read"`
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"`
SectorStartSealing func(context.Context, abi.SectorNumber) error `perm:"write"`
SectorSetSealDelay func(context.Context, time.Duration) error `perm:"write"`
SectorGetSealDelay func(context.Context) (time.Duration, error) `perm:"read"`
SectorSetExpectedSealDuration func(context.Context, time.Duration) error `perm:"write"`
SectorGetExpectedSealDuration func(context.Context) (time.Duration, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"`
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`
SectorsList func(context.Context) ([]abi.SectorNumber, error) `perm:"read"`
SectorsRefs func(context.Context) (map[string][]api.SealedRef, error) `perm:"read"`
SectorStartSealing func(context.Context, abi.SectorNumber) error `perm:"write"`
SectorSetSealDelay func(context.Context, time.Duration) error `perm:"write"`
SectorGetSealDelay func(context.Context) (time.Duration, error) `perm:"read"`
SectorSetExpectedSealDuration func(context.Context, time.Duration) error `perm:"write"`
SectorGetExpectedSealDuration func(context.Context) (time.Duration, error) `perm:"read"`
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"`
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`
WorkerConnect func(context.Context, string) error `perm:"admin"` // TODO: worker perm
WorkerStats func(context.Context) (map[uint64]storiface.WorkerStats, error) `perm:"admin"`

View File

@ -317,7 +317,7 @@ func (t *SealedRef) UnmarshalCBOR(r io.Reader) error {
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Offset = uint64(extra)
t.Offset = abi.PaddedPieceSize(extra)
}
// t.Size (abi.UnpaddedPieceSize) (uint64)

View File

@ -129,7 +129,7 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal)
waitDealSealed(t, ctx, miner, client, deal, false)
// Retrieval
info, err := client.ClientGetDealInfo(ctx, *deal)
@ -138,6 +138,82 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod
testRetrieval(t, ctx, err, client, fcid, &info.PieceCID, carExport, data)
}
func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) {
_ = os.Setenv("BELLMAN_NO_GPU", "1")
ctx := context.Background()
n, sn := b(t, 1, oneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]
addrinfo, err := client.NetAddrsListen(ctx)
if err != nil {
t.Fatal(err)
}
if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
mine := int64(1)
done := make(chan struct{})
go func() {
defer close(done)
for atomic.LoadInt64(&mine) == 1 {
time.Sleep(blocktime)
if err := sn[0].MineOne(ctx, func(bool, error) {}); err != nil {
t.Error(err)
}
}
}()
{
data1 := make([]byte, 800)
rand.New(rand.NewSource(int64(3))).Read(data1)
r := bytes.NewReader(data1)
fcid1, err := client.ClientImportLocal(ctx, r)
if err != nil {
t.Fatal(err)
}
data2 := make([]byte, 800)
rand.New(rand.NewSource(int64(9))).Read(data2)
r2 := bytes.NewReader(data2)
fcid2, err := client.ClientImportLocal(ctx, r2)
if err != nil {
t.Fatal(err)
}
deal1 := startDeal(t, ctx, miner, client, fcid1, true)
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal1, true)
deal2 := startDeal(t, ctx, miner, client, fcid2, true)
time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal2, false)
// Retrieval
info, err := client.ClientGetDealInfo(ctx, *deal2)
require.NoError(t, err)
rf, _ := miner.SectorsRefs(ctx)
fmt.Printf("refs: %+v\n", rf)
testRetrieval(t, ctx, err, client, fcid2, &info.PieceCID, false, data2)
}
atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
}
func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, fcid cid.Cid, fastRet bool) *cid.Cid {
maddr, err := miner.ActorAddress(ctx)
if err != nil {
@ -162,7 +238,7 @@ func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client
return deal
}
func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, deal *cid.Cid) {
func waitDealSealed(t *testing.T, ctx context.Context, miner TestStorageNode, client *impl.FullNodeAPI, deal *cid.Cid, noseal bool) {
loop:
for {
di, err := client.ClientGetDealInfo(ctx, *deal)
@ -171,6 +247,9 @@ loop:
}
switch di.State {
case storagemarket.StorageDealSealing:
if noseal {
return
}
startSealingWaiting(t, ctx, miner)
case storagemarket.StorageDealProposalRejected:
t.Fatal("deal rejected")

View File

@ -194,7 +194,7 @@ func TestDealMining(t *testing.T, b APIBuilder, blocktime time.Duration, carExpo
// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
waitDealSealed(t, ctx, provider, client, deal)
waitDealSealed(t, ctx, provider, client, deal, false)
<-minedTwo

View File

@ -225,7 +225,7 @@ func (n *ProviderNodeAdapter) LocatePieceForDealWithinSector(ctx context.Context
if bestSi.State == sealing.UndefinedSectorState {
return 0, 0, 0, xerrors.New("no sealed sector found")
}
return uint64(best.SectorID), best.Offset, uint64(best.Size), nil
return uint64(best.SectorID), uint64(best.Offset.Unpadded()), uint64(best.Size), nil
}
func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, cb storagemarket.DealSectorCommittedCallback) error {

View File

@ -147,14 +147,14 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid abi.SectorNumb
LastErr: info.LastErr,
Log: log,
// on chain info
SealProof: 0,
Activation: 0,
Expiration: 0,
DealWeight: big.Zero(),
SealProof: 0,
Activation: 0,
Expiration: 0,
DealWeight: big.Zero(),
VerifiedDealWeight: big.Zero(),
InitialPledge: big.Zero(),
OnTime: 0,
Early: 0,
InitialPledge: big.Zero(),
OnTime: 0,
Early: 0,
}
if !showOnChainInfo {

View File

@ -491,6 +491,8 @@ func TestAPIDealFlowReal(t *testing.T) {
logging.SetLogLevel("sub", "ERROR")
logging.SetLogLevel("storageminer", "ERROR")
saminer.PreCommitChallengeDelay = 5
t.Run("basic", func(t *testing.T) {
test.TestDealFlow(t, builder, time.Second, false, false)
})
@ -498,6 +500,10 @@ func TestAPIDealFlowReal(t *testing.T) {
t.Run("fast-retrieval", func(t *testing.T) {
test.TestDealFlow(t, builder, time.Second, false, true)
})
t.Run("retrieval-second", func(t *testing.T) {
test.TestSenondDealRetrieval(t, builder, time.Second)
})
}
func TestDealMining(t *testing.T) {

View File

@ -64,7 +64,7 @@ func NewSectorBlocks(miner *storage.Miner, ds dtypes.MetadataDS) *SectorBlocks {
return sbc
}
func (st *SectorBlocks) writeRef(dealID abi.DealID, sectorID abi.SectorNumber, offset uint64, size abi.UnpaddedPieceSize) error {
func (st *SectorBlocks) writeRef(dealID abi.DealID, sectorID abi.SectorNumber, offset abi.PaddedPieceSize, size abi.UnpaddedPieceSize) error {
st.keyLk.Lock() // TODO: make this multithreaded
defer st.keyLk.Unlock()
@ -102,7 +102,7 @@ func (st *SectorBlocks) AddPiece(ctx context.Context, size abi.UnpaddedPieceSize
return 0, err
}
err = st.writeRef(d.DealID, sn, offset, size)
err = st.writeRef(d.DealID, sn, abi.PaddedPieceSize(offset), size)
if err != nil {
return 0, xerrors.Errorf("writeRef: %w", err)
}