sector import: Test remote commit1 retry
This commit is contained in:
parent
9f03569cd0
commit
b2dfaae68c
@ -97,6 +97,10 @@ func (tm *TestMiner) PledgeSectors(ctx context.Context, n, existing int, blockNo
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tm *TestMiner) WaitSectorsProving(ctx context.Context, toCheck map[abi.SectorNumber]struct{}) {
|
func (tm *TestMiner) WaitSectorsProving(ctx context.Context, toCheck map[abi.SectorNumber]struct{}) {
|
||||||
|
tm.WaitSectorsProvingAllowFails(ctx, toCheck, map[api.SectorState]struct{}{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tm *TestMiner) WaitSectorsProvingAllowFails(ctx context.Context, toCheck map[abi.SectorNumber]struct{}, okFails map[api.SectorState]struct{}) {
|
||||||
for len(toCheck) > 0 {
|
for len(toCheck) > 0 {
|
||||||
tm.FlushSealingBatches(ctx)
|
tm.FlushSealingBatches(ctx)
|
||||||
|
|
||||||
@ -109,7 +113,9 @@ func (tm *TestMiner) WaitSectorsProving(ctx context.Context, toCheck map[abi.Sec
|
|||||||
delete(toCheck, n)
|
delete(toCheck, n)
|
||||||
}
|
}
|
||||||
if strings.Contains(string(st.State), "Fail") {
|
if strings.Contains(string(st.State), "Fail") {
|
||||||
tm.t.Fatal("sector in a failed state", st.State)
|
if _, ok := okFails[st.State]; !ok {
|
||||||
|
tm.t.Fatal("sector in a failed state", st.State)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
221
itests/sector_import_full_test.go
Normal file
221
itests/sector_import_full_test.go
Normal file
@ -0,0 +1,221 @@
|
|||||||
|
package itests
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||||
|
"github.com/filecoin-project/lotus/itests/kit"
|
||||||
|
sealing "github.com/filecoin-project/lotus/storage/pipeline"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper/basicfs"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSectorImport(t *testing.T) {
|
||||||
|
|
||||||
|
type testCase struct {
|
||||||
|
c1handler func(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request)
|
||||||
|
}
|
||||||
|
|
||||||
|
makeTest := func(mut func(*testCase)) *testCase {
|
||||||
|
tc := &testCase{
|
||||||
|
c1handler: remoteCommit1,
|
||||||
|
}
|
||||||
|
mut(tc)
|
||||||
|
return tc
|
||||||
|
}
|
||||||
|
|
||||||
|
runTest := func(tc *testCase) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
kit.QuietMiningLogs()
|
||||||
|
|
||||||
|
var blockTime = 50 * time.Millisecond
|
||||||
|
|
||||||
|
////////
|
||||||
|
// Start a miner node
|
||||||
|
|
||||||
|
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC())
|
||||||
|
ens.InterconnectAll().BeginMining(blockTime)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
////////
|
||||||
|
// Reserve some sector numbers on the miner node; We'll use one of those when creating the sector "remotely"
|
||||||
|
snums, err := miner.SectorNumReserveCount(ctx, "test-reservation-0001", 16)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
sectorDir := t.TempDir()
|
||||||
|
|
||||||
|
maddr, err := miner.ActorAddress(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
mid, err := address.IDFromAddress(maddr)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
spt, err := currentSealProof(ctx, client, maddr)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
ssize, err := spt.SectorSize()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
pieceSize := abi.PaddedPieceSize(ssize)
|
||||||
|
|
||||||
|
////////
|
||||||
|
// Create/Seal a sector up to pc2 outside of the pipeline
|
||||||
|
|
||||||
|
// get one sector number from the reservation done on the miner above
|
||||||
|
sn, err := snums.First()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// create all the sector identifiers
|
||||||
|
snum := abi.SectorNumber(sn)
|
||||||
|
sid := abi.SectorID{Miner: abi.ActorID(mid), Number: snum}
|
||||||
|
sref := storiface.SectorRef{ID: sid, ProofType: spt}
|
||||||
|
|
||||||
|
// create a low-level sealer instance
|
||||||
|
sealer, err := ffiwrapper.New(&basicfs.Provider{
|
||||||
|
Root: sectorDir,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// CRETE THE UNSEALED FILE
|
||||||
|
|
||||||
|
// create a reader for all-zero (CC) data
|
||||||
|
dataReader := bytes.NewReader(bytes.Repeat([]byte{0}, int(pieceSize.Unpadded())))
|
||||||
|
|
||||||
|
// create the unsealed CC sector file
|
||||||
|
pieceInfo, err := sealer.AddPiece(ctx, sref, nil, pieceSize.Unpadded(), dataReader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// GENERATE THE TICKET
|
||||||
|
|
||||||
|
// get most recent valid ticket epoch
|
||||||
|
ts, err := client.ChainHead(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
ticketEpoch := ts.Height() - policy.SealRandomnessLookback
|
||||||
|
|
||||||
|
// ticket entropy is cbor-seriasized miner addr
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
require.NoError(t, maddr.MarshalCBOR(buf))
|
||||||
|
|
||||||
|
// generate ticket randomness
|
||||||
|
rand, err := client.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// EXECUTE PRECOMMIT 1 / 2
|
||||||
|
|
||||||
|
// run PC1
|
||||||
|
pc1out, err := sealer.SealPreCommit1(ctx, sref, abi.SealRandomness(rand), []abi.PieceInfo{pieceInfo})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// run pc2
|
||||||
|
scids, err := sealer.SealPreCommit2(ctx, sref, pc1out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// make finalized cache, put it in [sectorDir]/fin-cache while keeping the large cache for remote C1
|
||||||
|
finDst := filepath.Join(sectorDir, "fin-cache", fmt.Sprintf("s-t01000-%d", snum))
|
||||||
|
require.NoError(t, os.MkdirAll(finDst, 0777))
|
||||||
|
require.NoError(t, sealer.FinalizeSectorInto(ctx, sref, finDst))
|
||||||
|
|
||||||
|
////////
|
||||||
|
// start http server serving sector data
|
||||||
|
|
||||||
|
m := mux.NewRouter()
|
||||||
|
m.HandleFunc("/sectors/{type}/{id}", remoteGetSector(sectorDir)).Methods("GET")
|
||||||
|
m.HandleFunc("/sectors/{id}/commit1", tc.c1handler(sealer)).Methods("POST")
|
||||||
|
srv := httptest.NewServer(m)
|
||||||
|
|
||||||
|
unsealedURL := fmt.Sprintf("%s/sectors/unsealed/s-t0%d-%d", srv.URL, mid, snum)
|
||||||
|
sealedURL := fmt.Sprintf("%s/sectors/sealed/s-t0%d-%d", srv.URL, mid, snum)
|
||||||
|
cacheURL := fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum)
|
||||||
|
remoteC1URL := fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum)
|
||||||
|
|
||||||
|
////////
|
||||||
|
// import the sector and continue sealing
|
||||||
|
|
||||||
|
err = miner.SectorReceive(ctx, api.RemoteSectorMeta{
|
||||||
|
State: "PreCommitting",
|
||||||
|
Sector: sid,
|
||||||
|
Type: spt,
|
||||||
|
|
||||||
|
Pieces: []api.SectorPiece{
|
||||||
|
{
|
||||||
|
Piece: pieceInfo,
|
||||||
|
DealInfo: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
TicketValue: abi.SealRandomness(rand),
|
||||||
|
TicketEpoch: ticketEpoch,
|
||||||
|
|
||||||
|
PreCommit1Out: pc1out,
|
||||||
|
|
||||||
|
CommD: &scids.Unsealed,
|
||||||
|
CommR: &scids.Sealed,
|
||||||
|
|
||||||
|
DataUnsealed: &storiface.SectorData{
|
||||||
|
Local: false,
|
||||||
|
URL: unsealedURL,
|
||||||
|
},
|
||||||
|
DataSealed: &storiface.SectorData{
|
||||||
|
Local: false,
|
||||||
|
URL: sealedURL,
|
||||||
|
},
|
||||||
|
DataCache: &storiface.SectorData{
|
||||||
|
Local: false,
|
||||||
|
URL: cacheURL,
|
||||||
|
},
|
||||||
|
|
||||||
|
RemoteCommit1Endpoint: remoteC1URL,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// check that we see the imported sector
|
||||||
|
ng, err := miner.SectorsListNonGenesis(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, ng, 1)
|
||||||
|
require.Equal(t, snum, ng[0])
|
||||||
|
|
||||||
|
miner.WaitSectorsProvingAllowFails(ctx, map[abi.SectorNumber]struct{}{snum: {}}, map[api.SectorState]struct{}{api.SectorState(sealing.RemoteCommit1Failed): {}})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fail first remote c1, verifies that c1 retry works
|
||||||
|
t.Run("c1-retry", runTest(makeTest(func(testCase *testCase) {
|
||||||
|
prt := sealing.MinRetryTime
|
||||||
|
sealing.MinRetryTime = time.Second
|
||||||
|
t.Cleanup(func() {
|
||||||
|
sealing.MinRetryTime = prt
|
||||||
|
})
|
||||||
|
|
||||||
|
testCase.c1handler = func(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var failedOnce bool
|
||||||
|
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !failedOnce {
|
||||||
|
failedOnce = true
|
||||||
|
w.WriteHeader(http.StatusBadGateway)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
remoteCommit1(s)(w, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})))
|
||||||
|
}
|
@ -36,12 +36,16 @@ func TestSectorImportAfterPC2(t *testing.T) {
|
|||||||
|
|
||||||
var blockTime = 50 * time.Millisecond
|
var blockTime = 50 * time.Millisecond
|
||||||
|
|
||||||
|
////////
|
||||||
|
// Start a miner node
|
||||||
|
|
||||||
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC())
|
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC())
|
||||||
ens.InterconnectAll().BeginMining(blockTime)
|
ens.InterconnectAll().BeginMining(blockTime)
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// get some sector numbers
|
////////
|
||||||
|
// Reserve some sector numbers on the miner node; We'll use one of those when creating the sector "remotely"
|
||||||
snums, err := miner.SectorNumReserveCount(ctx, "test-reservation-0001", 16)
|
snums, err := miner.SectorNumReserveCount(ctx, "test-reservation-0001", 16)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -62,32 +66,34 @@ func TestSectorImportAfterPC2(t *testing.T) {
|
|||||||
pieceSize := abi.PaddedPieceSize(ssize)
|
pieceSize := abi.PaddedPieceSize(ssize)
|
||||||
|
|
||||||
////////
|
////////
|
||||||
// seal a sector up to pc2 outside of the pipeline
|
// Create/Seal a sector up to pc2 outside of the pipeline
|
||||||
|
|
||||||
|
// get one sector number from the reservation done on the miner above
|
||||||
sn, err := snums.First()
|
sn, err := snums.First()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// create all the sector identifiers
|
||||||
snum := abi.SectorNumber(sn)
|
snum := abi.SectorNumber(sn)
|
||||||
sid := abi.SectorID{
|
sid := abi.SectorID{Miner: abi.ActorID(mid), Number: snum}
|
||||||
Miner: abi.ActorID(mid),
|
sref := storiface.SectorRef{ID: sid, ProofType: spt}
|
||||||
Number: snum,
|
|
||||||
}
|
|
||||||
|
|
||||||
sref := storiface.SectorRef{
|
|
||||||
ID: sid,
|
|
||||||
ProofType: spt,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// create a low-level sealer instance
|
||||||
sealer, err := ffiwrapper.New(&basicfs.Provider{
|
sealer, err := ffiwrapper.New(&basicfs.Provider{
|
||||||
Root: sectorDir,
|
Root: sectorDir,
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// CRETE THE UNSEALED FILE
|
||||||
|
|
||||||
|
// create a reader for all-zero (CC) data
|
||||||
dataReader := bytes.NewReader(bytes.Repeat([]byte{0}, int(pieceSize.Unpadded())))
|
dataReader := bytes.NewReader(bytes.Repeat([]byte{0}, int(pieceSize.Unpadded())))
|
||||||
|
|
||||||
// create the unsealed sector file
|
// create the unsealed CC sector file
|
||||||
pieceInfo, err := sealer.AddPiece(ctx, sref, nil, pieceSize.Unpadded(), dataReader)
|
pieceInfo, err := sealer.AddPiece(ctx, sref, nil, pieceSize.Unpadded(), dataReader)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// GENERATE THE TICKET
|
||||||
|
|
||||||
// get most recent valid ticket epoch
|
// get most recent valid ticket epoch
|
||||||
ts, err := client.ChainHead(ctx)
|
ts, err := client.ChainHead(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -101,6 +107,8 @@ func TestSectorImportAfterPC2(t *testing.T) {
|
|||||||
rand, err := client.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key())
|
rand, err := client.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// EXECUTE PRECOMMIT 1 / 2
|
||||||
|
|
||||||
// run PC1
|
// run PC1
|
||||||
pc1out, err := sealer.SealPreCommit1(ctx, sref, abi.SealRandomness(rand), []abi.PieceInfo{pieceInfo})
|
pc1out, err := sealer.SealPreCommit1(ctx, sref, abi.SealRandomness(rand), []abi.PieceInfo{pieceInfo})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -109,7 +117,7 @@ func TestSectorImportAfterPC2(t *testing.T) {
|
|||||||
scids, err := sealer.SealPreCommit2(ctx, sref, pc1out)
|
scids, err := sealer.SealPreCommit2(ctx, sref, pc1out)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// make finalized cache, put it in [sectorDir]/fin-cache
|
// make finalized cache, put it in [sectorDir]/fin-cache while keeping the large cache for remote C1
|
||||||
finDst := filepath.Join(sectorDir, "fin-cache", fmt.Sprintf("s-t01000-%d", snum))
|
finDst := filepath.Join(sectorDir, "fin-cache", fmt.Sprintf("s-t01000-%d", snum))
|
||||||
require.NoError(t, os.MkdirAll(finDst, 0777))
|
require.NoError(t, os.MkdirAll(finDst, 0777))
|
||||||
require.NoError(t, sealer.FinalizeSectorInto(ctx, sref, finDst))
|
require.NoError(t, sealer.FinalizeSectorInto(ctx, sref, finDst))
|
||||||
@ -122,6 +130,11 @@ func TestSectorImportAfterPC2(t *testing.T) {
|
|||||||
m.HandleFunc("/sectors/{id}/commit1", remoteCommit1(sealer)).Methods("POST")
|
m.HandleFunc("/sectors/{id}/commit1", remoteCommit1(sealer)).Methods("POST")
|
||||||
srv := httptest.NewServer(m)
|
srv := httptest.NewServer(m)
|
||||||
|
|
||||||
|
unsealedURL := fmt.Sprintf("%s/sectors/unsealed/s-t0%d-%d", srv.URL, mid, snum)
|
||||||
|
sealedURL := fmt.Sprintf("%s/sectors/sealed/s-t0%d-%d", srv.URL, mid, snum)
|
||||||
|
cacheURL := fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum)
|
||||||
|
remoteC1URL := fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum)
|
||||||
|
|
||||||
////////
|
////////
|
||||||
// import the sector and continue sealing
|
// import the sector and continue sealing
|
||||||
|
|
||||||
@ -147,18 +160,18 @@ func TestSectorImportAfterPC2(t *testing.T) {
|
|||||||
|
|
||||||
DataUnsealed: &storiface.SectorData{
|
DataUnsealed: &storiface.SectorData{
|
||||||
Local: false,
|
Local: false,
|
||||||
URL: fmt.Sprintf("%s/sectors/unsealed/s-t0%d-%d", srv.URL, mid, snum),
|
URL: unsealedURL,
|
||||||
},
|
},
|
||||||
DataSealed: &storiface.SectorData{
|
DataSealed: &storiface.SectorData{
|
||||||
Local: false,
|
Local: false,
|
||||||
URL: fmt.Sprintf("%s/sectors/sealed/s-t0%d-%d", srv.URL, mid, snum),
|
URL: sealedURL,
|
||||||
},
|
},
|
||||||
DataCache: &storiface.SectorData{
|
DataCache: &storiface.SectorData{
|
||||||
Local: false,
|
Local: false,
|
||||||
URL: fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum),
|
URL: cacheURL,
|
||||||
},
|
},
|
||||||
|
|
||||||
RemoteCommit1Endpoint: fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum),
|
RemoteCommit1Endpoint: remoteC1URL,
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
@ -126,7 +126,6 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta
|
|||||||
case GetTicket:
|
case GetTicket:
|
||||||
fallthrough
|
fallthrough
|
||||||
case Packing:
|
case Packing:
|
||||||
// todo check num free
|
|
||||||
info.Return = ReturnState(meta.State) // todo dedupe states
|
info.Return = ReturnState(meta.State) // todo dedupe states
|
||||||
info.State = ReceiveSector
|
info.State = ReceiveSector
|
||||||
|
|
||||||
|
@ -19,12 +19,12 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
const minRetryTime = 1 * time.Minute
|
var MinRetryTime = 1 * time.Minute
|
||||||
|
|
||||||
func failedCooldown(ctx statemachine.Context, sector SectorInfo) error {
|
func failedCooldown(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
// TODO: Exponential backoff when we see consecutive failures
|
// TODO: Exponential backoff when we see consecutive failures
|
||||||
|
|
||||||
retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(minRetryTime)
|
retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(MinRetryTime)
|
||||||
if len(sector.Log) > 0 && !time.Now().After(retryStart) {
|
if len(sector.Log) > 0 && !time.Now().After(retryStart) {
|
||||||
log.Infof("%s(%d), waiting %s before retrying", sector.State, sector.SectorNumber, time.Until(retryStart))
|
log.Infof("%s(%d), waiting %s before retrying", sector.State, sector.SectorNumber, time.Until(retryStart))
|
||||||
select {
|
select {
|
||||||
|
Loading…
Reference in New Issue
Block a user