Merge pull request #3710 from filecoin-project/feat/window-post-faulty-sectors

windowed post generation now returns faulty sectors
This commit is contained in:
Łukasz Magiera 2020-09-11 11:15:32 +02:00 committed by GitHub
commit b89aa02d77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 191 additions and 129 deletions

3
.gitmodules vendored
View File

@ -8,3 +8,6 @@
[submodule "extern/test-vectors"]
path = extern/test-vectors
url = https://github.com/filecoin-project/test-vectors.git
[submodule "extern/fil-blst"]
path = extern/fil-blst
url = https://github.com/filecoin-project/fil-blst.git

View File

@ -192,7 +192,7 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector
// Drop the partition
err = parts[0].Sectors.ForEach(func(sid uint64) error {
return miner.StorageMiner.(*impl.StorageMinerAPI).IStorageMgr.(*mock.SectorMgr).MarkFailed(abi.SectorID{
return miner.StorageMiner.(*impl.StorageMinerAPI).IStorageMgr.(*mock.SectorMgr).MarkCorrupted(abi.SectorID{
Miner: abi.ActorID(mid),
Number: abi.SectorNumber(sid),
}, true)

1
extern/fil-blst vendored Submodule

@ -0,0 +1 @@
Subproject commit 5f93488fc0dbfb450f2355269f18fc67010d59bb

2
extern/filecoin-ffi vendored

@ -1 +1 @@
Subproject commit 40569104603407c999d6c9e4c3f1228cbd4d0e5c
Subproject commit f640612a1a1f7a2dd8b3a49e1531db0aa0f63447

View File

@ -168,50 +168,34 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec
}
}
func post(t *testing.T, sealer *Sealer, seals ...seal) time.Time {
/*randomness := abi.PoStRandomness{0, 9, 2, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 7}
func post(t *testing.T, sealer *Sealer, skipped []abi.SectorID, seals ...seal) {
randomness := abi.PoStRandomness{0, 9, 2, 7, 6, 5, 4, 3, 2, 1, 0, 9, 8, 7, 6, 45, 3, 2, 1, 0, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 9, 7}
sis := make([]abi.SectorInfo, len(seals))
sis := make([]saproof.SectorInfo, len(seals))
for i, s := range seals {
sis[i] = abi.SectorInfo{
RegisteredProof: sealProofType,
SectorNumber: s.id.Number,
SealedCID: s.cids.Sealed,
sis[i] = saproof.SectorInfo{
SealProof: sealProofType,
SectorNumber: s.id.Number,
SealedCID: s.cids.Sealed,
}
}
candidates, err := sealer.GenerateEPostCandidates(context.TODO(), seals[0].id.Miner, sis, randomness, []abi.SectorNumber{})
if err != nil {
t.Fatalf("%+v", err)
}*/
fmt.Println("skipping post")
genCandidates := time.Now()
/*if len(candidates) != 1 {
t.Fatal("expected 1 candidate")
proofs, skp, err := sealer.GenerateWindowPoSt(context.TODO(), seals[0].id.Miner, sis, randomness)
if len(skipped) > 0 {
require.Error(t, err)
require.EqualValues(t, skipped, skp)
return
}
candidatesPrime := make([]abi.PoStCandidate, len(candidates))
for idx := range candidatesPrime {
candidatesPrime[idx] = candidates[idx].Candidate
}
proofs, err := sealer.ComputeElectionPoSt(context.TODO(), seals[0].id.Miner, sis, randomness, candidatesPrime)
if err != nil {
t.Fatalf("%+v", err)
}
ePoStChallengeCount := ElectionPostChallengeCount(uint64(len(sis)), 0)
ok, err := ProofVerifier.VerifyElectionPost(context.TODO(), abi.PoStVerifyInfo{
Randomness: randomness,
Candidates: candidatesPrime,
Proofs: proofs,
EligibleSectors: sis,
Prover: seals[0].id.Miner,
ChallengeCount: ePoStChallengeCount,
ok, err := ProofVerifier.VerifyWindowPoSt(context.TODO(), saproof.WindowPoStVerifyInfo{
Randomness: randomness,
Proofs: proofs,
ChallengedSectors: sis,
Prover: seals[0].id.Miner,
})
if err != nil {
t.Fatalf("%+v", err)
@ -219,8 +203,21 @@ func post(t *testing.T, sealer *Sealer, seals ...seal) time.Time {
if !ok {
t.Fatal("bad post")
}
*/
return genCandidates
}
func corrupt(t *testing.T, sealer *Sealer, id abi.SectorID) {
paths, done, err := sealer.sectors.AcquireSector(context.Background(), id, stores.FTSealed, 0, stores.PathStorage)
require.NoError(t, err)
defer done()
log.Infof("corrupt %s", paths.Sealed)
f, err := os.OpenFile(paths.Sealed, os.O_RDWR, 0664)
require.NoError(t, err)
_, err = f.WriteAt(bytes.Repeat([]byte{'d'}, 2048), 0)
require.NoError(t, err)
require.NoError(t, f.Close())
}
func getGrothParamFileAndVerifyingKeys(s abi.SectorSize) {
@ -299,11 +296,11 @@ func TestSealAndVerify(t *testing.T) {
commit := time.Now()
genCandidiates := post(t, sb, s)
post(t, sb, nil, s)
epost := time.Now()
post(t, sb, s)
post(t, sb, nil, s)
if err := sb.FinalizeSector(context.TODO(), si, nil); err != nil {
t.Fatalf("%+v", err)
@ -313,8 +310,7 @@ func TestSealAndVerify(t *testing.T) {
fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String())
fmt.Printf("Commit: %s\n", commit.Sub(precommit).String())
fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(commit).String())
fmt.Printf("EPoSt: %s\n", epost.Sub(genCandidiates).String())
fmt.Printf("EPoSt: %s\n", epost.Sub(commit).String())
}
func TestSealPoStNoCommit(t *testing.T) {
@ -370,16 +366,15 @@ func TestSealPoStNoCommit(t *testing.T) {
t.Fatal(err)
}
genCandidiates := post(t, sb, s)
post(t, sb, nil, s)
epost := time.Now()
fmt.Printf("PreCommit: %s\n", precommit.Sub(start).String())
fmt.Printf("GenCandidates: %s\n", genCandidiates.Sub(precommit).String())
fmt.Printf("EPoSt: %s\n", epost.Sub(genCandidiates).String())
fmt.Printf("EPoSt: %s\n", epost.Sub(precommit).String())
}
func TestSealAndVerify2(t *testing.T) {
func TestSealAndVerify3(t *testing.T) {
defer requireFDsClosed(t, openFDs(t))
if runtime.NumCPU() < 10 && os.Getenv("CI") == "" { // don't bother on slow hardware
@ -419,22 +414,32 @@ func TestSealAndVerify2(t *testing.T) {
si1 := abi.SectorID{Miner: miner, Number: 1}
si2 := abi.SectorID{Miner: miner, Number: 2}
si3 := abi.SectorID{Miner: miner, Number: 3}
s1 := seal{id: si1}
s2 := seal{id: si2}
s3 := seal{id: si3}
wg.Add(2)
wg.Add(3)
go s1.precommit(t, sb, si1, wg.Done) //nolint: staticcheck
time.Sleep(100 * time.Millisecond)
go s2.precommit(t, sb, si2, wg.Done) //nolint: staticcheck
time.Sleep(100 * time.Millisecond)
go s3.precommit(t, sb, si3, wg.Done) //nolint: staticcheck
wg.Wait()
wg.Add(2)
wg.Add(3)
go s1.commit(t, sb, wg.Done) //nolint: staticcheck
go s2.commit(t, sb, wg.Done) //nolint: staticcheck
go s3.commit(t, sb, wg.Done) //nolint: staticcheck
wg.Wait()
post(t, sb, s1, s2)
post(t, sb, nil, s1, s2, s3)
corrupt(t, sb, si1)
corrupt(t, sb, si2)
post(t, sb, []abi.SectorID{si1, si2}, s1, s2, s3)
}
func BenchmarkWriteWithAlignment(b *testing.B) {

View File

@ -40,8 +40,21 @@ func (sb *Sealer) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, s
}
defer done()
proof, err := ffi.GenerateWindowPoSt(minerID, privsectors, randomness)
return proof, skipped, err
if len(skipped) > 0 {
return nil, skipped, xerrors.Errorf("pubSectorToPriv skipped some sectors")
}
proof, faulty, err := ffi.GenerateWindowPoSt(minerID, privsectors, randomness)
var faultyIDs []abi.SectorID
for _, f := range faulty {
faultyIDs = append(faultyIDs, abi.SectorID{
Miner: minerID,
Number: f,
})
}
return proof, faultyIDs, err
}
func (sb *Sealer) pubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorInfo []proof.SectorInfo, faults []abi.SectorNumber, rpt func(abi.RegisteredSealProof) (abi.RegisteredPoStProof, error)) (ffi.SortedPrivateSectorInfo, []abi.SectorID, func(), error) {

View File

@ -66,8 +66,9 @@ const (
)
type sectorState struct {
pieces []cid.Cid
failed bool
pieces []cid.Cid
failed bool
corrupted bool
state int
@ -251,6 +252,18 @@ func (mgr *SectorMgr) MarkFailed(sid abi.SectorID, failed bool) error {
return nil
}
func (mgr *SectorMgr) MarkCorrupted(sid abi.SectorID, corrupted bool) error {
mgr.lk.Lock()
defer mgr.lk.Unlock()
ss, ok := mgr.sectors[sid]
if !ok {
return fmt.Errorf("no such sector in storage")
}
ss.corrupted = corrupted
return nil
}
func opFinishWait(ctx context.Context) {
val, ok := ctx.Value("opfinish").(chan struct{})
if !ok {
@ -275,6 +288,8 @@ func (mgr *SectorMgr) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorI
si := make([]proof.SectorInfo, 0, len(sectorInfo))
var skipped []abi.SectorID
var err error
for _, info := range sectorInfo {
sid := abi.SectorID{
Miner: minerID,
@ -283,13 +298,18 @@ func (mgr *SectorMgr) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorI
_, found := mgr.sectors[sid]
if found && !mgr.sectors[sid].failed {
if found && !mgr.sectors[sid].failed && !mgr.sectors[sid].corrupted {
si = append(si, info)
} else {
skipped = append(skipped, sid)
err = xerrors.Errorf("skipped some sectors")
}
}
if err != nil {
return nil, skipped, err
}
return generateFakePoSt(si, abi.RegisteredSealProof.RegisteredWindowPoStProof, randomness), skipped, nil
}

6
go.mod
View File

@ -2,8 +2,6 @@ module github.com/filecoin-project/lotus
go 1.14
replace github.com/supranational/blst => github.com/supranational/blst v0.1.2-alpha.1
require (
contrib.go.opencensus.io/exporter/jaeger v0.1.0
contrib.go.opencensus.io/exporter/prometheus v0.1.0
@ -136,3 +134,7 @@ replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v
replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi
replace github.com/filecoin-project/test-vectors => ./extern/test-vectors
replace github.com/supranational/blst => ./extern/fil-blst/blst
replace github.com/filecoin-project/fil-blst => ./extern/fil-blst

2
go.sum
View File

@ -1325,8 +1325,6 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/supranational/blst v0.1.2-alpha.1 h1:v0UqVlvbRNZIaSeMPr+T01kvTUq1h0EZuZ6gnDR1Mlg=
github.com/supranational/blst v0.1.2-alpha.1/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=

View File

@ -337,94 +337,114 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
Proofs: nil,
}
var sinfos []proof.SectorInfo
sidToPart := map[abi.SectorNumber]uint64{}
skipCount := uint64(0)
postSkipped := bitfield.New()
var postOut []proof.PoStProof
for partIdx, partition := range partitions {
// TODO: Can do this in parallel
toProve, err := partition.ActiveSectors()
for retries := 0; retries < 5; retries++ {
var sinfos []proof.SectorInfo
sidToPart := map[abi.SectorNumber]int{}
for partIdx, partition := range partitions {
// TODO: Can do this in parallel
toProve, err := partition.ActiveSectors()
if err != nil {
return nil, xerrors.Errorf("getting active sectors: %w", err)
}
toProve, err = bitfield.MergeBitFields(toProve, partition.Recoveries)
if err != nil {
return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err)
}
toProve, err = bitfield.SubtractBitField(toProve, postSkipped)
if err != nil {
return nil, xerrors.Errorf("toProve - postSkipped: %w", err)
}
good, err := s.checkSectors(ctx, toProve)
if err != nil {
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
}
skipped, err := bitfield.SubtractBitField(toProve, good)
if err != nil {
return nil, xerrors.Errorf("toProve - good: %w", err)
}
sc, err := skipped.Count()
if err != nil {
return nil, xerrors.Errorf("getting skipped sector count: %w", err)
}
skipCount += sc
ssi, err := s.sectorsForProof(ctx, good, partition.Sectors, ts)
if err != nil {
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
}
if len(ssi) == 0 {
continue
}
sinfos = append(sinfos, ssi...)
for _, si := range ssi {
sidToPart[si.SectorNumber] = partIdx
}
params.Partitions = append(params.Partitions, miner.PoStPartition{
Index: uint64(partIdx),
Skipped: skipped,
})
}
if len(sinfos) == 0 {
// nothing to prove..
return nil, errNoPartitions
}
log.Infow("running windowPost",
"chain-random", rand,
"deadline", di,
"height", ts.Height(),
"skipped", skipCount)
tsStart := build.Clock.Now()
mid, err := address.IDFromAddress(s.actor)
if err != nil {
return nil, xerrors.Errorf("getting active sectors: %w", err)
return nil, err
}
toProve, err = bitfield.MergeBitFields(toProve, partition.Recoveries)
if err != nil {
return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err)
var ps []abi.SectorID
postOut, ps, err = s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, abi.PoStRandomness(rand))
elapsed := time.Since(tsStart)
log.Infow("computing window PoSt", "elapsed", elapsed)
if err == nil {
break
}
good, err := s.checkSectors(ctx, toProve)
if err != nil {
return nil, xerrors.Errorf("checking sectors to skip: %w", err)
if len(ps) == 0 {
return nil, xerrors.Errorf("running post failed: %w", err)
}
skipped, err := bitfield.SubtractBitField(toProve, good)
if err != nil {
return nil, xerrors.Errorf("toProve - good: %w", err)
log.Warnw("generate window PoSt skipped sectors", "sectors", ps, "error", err, "try", retries)
skipCount += uint64(len(ps))
for _, sector := range ps {
postSkipped.Set(uint64(sector.Number))
}
sc, err := skipped.Count()
if err != nil {
return nil, xerrors.Errorf("getting skipped sector count: %w", err)
}
skipCount += sc
ssi, err := s.sectorsForProof(ctx, good, partition.Sectors, ts)
if err != nil {
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
}
if len(ssi) == 0 {
continue
}
sinfos = append(sinfos, ssi...)
for _, si := range ssi {
sidToPart[si.SectorNumber] = uint64(partIdx)
}
params.Partitions = append(params.Partitions, miner.PoStPartition{
Index: uint64(partIdx),
Skipped: skipped,
})
}
if len(sinfos) == 0 {
// nothing to prove..
return nil, errNoPartitions
}
log.Infow("running windowPost",
"chain-random", rand,
"deadline", di,
"height", ts.Height(),
"skipped", skipCount)
tsStart := build.Clock.Now()
mid, err := address.IDFromAddress(s.actor)
if err != nil {
return nil, err
}
postOut, postSkipped, err := s.prover.GenerateWindowPoSt(ctx, abi.ActorID(mid), sinfos, abi.PoStRandomness(rand))
if err != nil {
return nil, xerrors.Errorf("running post failed: %w", err)
}
if len(postOut) == 0 {
return nil, xerrors.Errorf("received proofs back from generate window post")
return nil, xerrors.Errorf("received no proofs back from generate window post")
}
params.Proofs = postOut
for _, sector := range postSkipped {
params.Partitions[sidToPart[sector.Number]].Skipped.Set(uint64(sector.Number))
}
elapsed := time.Since(tsStart)
commEpoch := di.Open
commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil)
if err != nil {
@ -433,7 +453,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
params.ChainCommitEpoch = commEpoch
params.ChainCommitRand = commRand
log.Infow("submitting window PoSt", "elapsed", elapsed)
log.Infow("submitting window PoSt")
return params, nil
}