Merge pull request #2877 from filecoin-project/fix/unsealbug
Fix unseal bug, reproduce in test
This commit is contained in:
commit
4a831f3dd2
@ -73,6 +73,7 @@ type StorageMiner interface {
|
||||
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error
|
||||
MarketListDeals(ctx context.Context) ([]storagemarket.StorageDeal, error)
|
||||
MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error)
|
||||
MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error)
|
||||
MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error)
|
||||
MarketSetAsk(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error
|
||||
MarketGetAsk(ctx context.Context) (*storagemarket.SignedStorageAsk, error)
|
||||
|
@ -218,6 +218,7 @@ type StorageMinerStruct struct {
|
||||
MarketImportDealData func(context.Context, cid.Cid, string) error `perm:"write"`
|
||||
MarketListDeals func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
|
||||
MarketListRetrievalDeals func(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) `perm:"read"`
|
||||
MarketGetDealUpdates func(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) `perm:"read"`
|
||||
MarketListIncompleteDeals func(ctx context.Context) ([]storagemarket.MinerDeal, error) `perm:"read"`
|
||||
MarketSetAsk func(ctx context.Context, price types.BigInt, verifiedPrice types.BigInt, duration abi.ChainEpoch, minPieceSize abi.PaddedPieceSize, maxPieceSize abi.PaddedPieceSize) error `perm:"admin"`
|
||||
MarketGetAsk func(ctx context.Context) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
|
||||
@ -1005,6 +1006,10 @@ func (c *StorageMinerStruct) MarketListRetrievalDeals(ctx context.Context) ([]re
|
||||
return c.Internal.MarketListRetrievalDeals(ctx)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) {
|
||||
return c.Internal.MarketGetDealUpdates(ctx, d)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) {
|
||||
return c.Internal.MarketListIncompleteDeals(ctx)
|
||||
}
|
||||
|
@ -144,6 +144,61 @@ func makeDeal(t *testing.T, ctx context.Context, rseed int, client *impl.FullNod
|
||||
testRetrieval(t, ctx, err, client, fcid, &info.PieceCID, carExport, data)
|
||||
}
|
||||
|
||||
func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
|
||||
_ = os.Setenv("BELLMAN_NO_GPU", "1")
|
||||
|
||||
ctx := context.Background()
|
||||
n, sn := b(t, 1, oneMiner)
|
||||
client := n[0].FullNode.(*impl.FullNodeAPI)
|
||||
miner := sn[0]
|
||||
|
||||
addrinfo, err := client.NetAddrsListen(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := miner.NetConnect(ctx, addrinfo); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
|
||||
mine := int64(1)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
for atomic.LoadInt64(&mine) == 1 {
|
||||
time.Sleep(blocktime)
|
||||
if err := sn[0].MineOne(ctx, MineNext); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
data := make([]byte, 1600)
|
||||
rand.New(rand.NewSource(int64(8))).Read(data)
|
||||
|
||||
r := bytes.NewReader(data)
|
||||
fcid, err := client.ClientImportLocal(ctx, r)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fmt.Println("FILE CID: ", fcid)
|
||||
|
||||
deal := startDeal(t, ctx, miner, client, fcid, true)
|
||||
|
||||
waitDealPublished(t, ctx, miner, deal)
|
||||
fmt.Println("deal published, retrieving")
|
||||
// Retrieval
|
||||
info, err := client.ClientGetDealInfo(ctx, *deal)
|
||||
require.NoError(t, err)
|
||||
|
||||
testRetrieval(t, ctx, err, client, fcid, &info.PieceCID, false, data)
|
||||
atomic.AddInt64(&mine, -1)
|
||||
fmt.Println("shutting down mining")
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestSenondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) {
|
||||
_ = os.Setenv("BELLMAN_NO_GPU", "1")
|
||||
|
||||
@ -275,6 +330,34 @@ loop:
|
||||
}
|
||||
}
|
||||
|
||||
func waitDealPublished(t *testing.T, ctx context.Context, miner TestStorageNode, deal *cid.Cid) {
|
||||
subCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
updates, err := miner.MarketGetDealUpdates(subCtx, *deal)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("context timeout")
|
||||
case di := <-updates:
|
||||
switch di.State {
|
||||
case storagemarket.StorageDealProposalRejected:
|
||||
t.Fatal("deal rejected")
|
||||
case storagemarket.StorageDealFailing:
|
||||
t.Fatal("deal failed")
|
||||
case storagemarket.StorageDealError:
|
||||
t.Fatal("deal errored", di.Message)
|
||||
case storagemarket.StorageDealFinalizing, storagemarket.StorageDealSealing, storagemarket.StorageDealActive:
|
||||
fmt.Println("COMPLETE", di)
|
||||
return
|
||||
}
|
||||
fmt.Println("Deal state: ", storagemarket.DealStates[di.State])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func startSealingWaiting(t *testing.T, ctx context.Context, miner TestStorageNode) {
|
||||
snums, err := miner.SectorsList(ctx)
|
||||
require.NoError(t, err)
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/filecoin-project/sector-storage/storiface"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
@ -57,7 +58,11 @@ func (rpn *retrievalProviderNode) UnsealSector(ctx context.Context, sectorID abi
|
||||
|
||||
r, w := io.Pipe()
|
||||
go func() {
|
||||
err := rpn.sealer.ReadPiece(ctx, w, sid, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, *si.CommD)
|
||||
var commD cid.Cid
|
||||
if si.CommD != nil {
|
||||
commD = *si.CommD
|
||||
}
|
||||
err := rpn.sealer.ReadPiece(ctx, w, sid, storiface.UnpaddedByteIndex(offset), length, si.TicketValue, commD)
|
||||
_ = w.CloseWithError(err)
|
||||
}()
|
||||
|
||||
|
@ -298,6 +298,24 @@ func (sm *StorageMinerAPI) MarketListRetrievalDeals(ctx context.Context) ([]retr
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) MarketGetDealUpdates(ctx context.Context, d cid.Cid) (<-chan storagemarket.MinerDeal, error) {
|
||||
results := make(chan storagemarket.MinerDeal)
|
||||
unsub := sm.StorageProvider.SubscribeToEvents(func(evt storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
|
||||
if deal.ProposalCid.Equals(d) {
|
||||
select {
|
||||
case results <- deal:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
})
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
unsub()
|
||||
close(results)
|
||||
}()
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) MarketListIncompleteDeals(ctx context.Context) ([]storagemarket.MinerDeal, error) {
|
||||
return sm.StorageProvider.ListLocalDeals()
|
||||
}
|
||||
|
@ -511,6 +511,9 @@ func TestAPIDealFlow(t *testing.T) {
|
||||
t.Run("TestDoubleDealFlow", func(t *testing.T) {
|
||||
test.TestDoubleDealFlow(t, mockSbBuilder, 10*time.Millisecond)
|
||||
})
|
||||
t.Run("TestFastRetrievalDealFlow", func(t *testing.T) {
|
||||
test.TestFastRetrievalDealFlow(t, mockSbBuilder, 10*time.Millisecond)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAPIDealFlowReal(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user