storageminer: Handle uncommited sectors on start

This commit is contained in:
Łukasz Magiera 2019-10-29 20:46:32 +01:00
parent 6d594bab67
commit 169c285fb3
4 changed files with 55 additions and 0 deletions

View File

@ -110,6 +110,10 @@ func (sb *SectorBuilder) GetAllStagedSectors() ([]uint64, error) {
return out, nil return out, nil
} }
func (sb *SectorBuilder) GetAllSealedSectors() ([]SealedSectorMetadata, error) {
return sectorbuilder.GetAllSealedSectors(sb.handle)
}
func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) {
// Wait, this is a blocking method with no way of interrupting it? // Wait, this is a blocking method with no way of interrupting it?
// does it checkpoint itself? // does it checkpoint itself?

View File

@ -136,3 +136,12 @@ func (ct *Tracker) WaitCommit(ctx context.Context, miner address.Address, sector
return cid.Undef, ctx.Err() return cid.Undef, ctx.Err()
} }
} }
func (ct *Tracker) CheckCommitment(miner address.Address, sectorId uint64) (bool, error) {
key := commitmentKey(miner, sectorId)
ct.lk.Lock()
defer ct.lk.Unlock()
return ct.commitments.Has(key)
}

View File

@ -2,6 +2,7 @@ package storage
import ( import (
"context" "context"
"github.com/filecoin-project/go-sectorbuilder/sealing_state"
"sync" "sync"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -93,10 +94,47 @@ func (m *Miner) Run(ctx context.Context) error {
return nil return nil
} }
func (m *Miner) commitUntrackedSectors(ctx context.Context) error {
sealed, err := m.secst.Sealed()
if err != nil {
return err
}
for _, s := range sealed {
has, err := m.commt.CheckCommitment(m.maddr, s.SectorID)
if err != nil {
log.Error("checking commitment: ", err)
}
if has {
continue
}
log.Warnf("Missing commitment for sector %d, committing sector", s.SectorID)
if err := m.commitSector(ctx, sectorbuilder.SectorSealingStatus{
SectorID: s.SectorID,
State: sealing_state.Sealed,
CommD: s.CommD,
CommR: s.CommR,
Proof: s.Proof,
Pieces: s.Pieces,
Ticket: s.Ticket,
}); err != nil {
log.Error("Committing uncommitted sector failed: ", err)
}
}
return nil
}
func (m *Miner) handlePostingSealedSectors(ctx context.Context) { func (m *Miner) handlePostingSealedSectors(ctx context.Context) {
incoming := m.secst.Incoming() incoming := m.secst.Incoming()
defer m.secst.CloseIncoming(incoming) defer m.secst.CloseIncoming(incoming)
if err := m.commitUntrackedSectors(ctx); err != nil {
log.Error(err)
}
for { for {
select { select {
case sinfo, ok := <-incoming: case sinfo, ok := <-incoming:

View File

@ -263,6 +263,10 @@ func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.Sect
return s.sb.SealStatus(sector) return s.sb.SealStatus(sector)
} }
func (s *Store) Sealed() ([]sectorbuilder.SealedSectorMetadata, error) {
return s.sb.GetAllSealedSectors()
}
func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) { func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) {
sbsi := make([]sectorbuilder.SectorInfo, len(sectors)) sbsi := make([]sectorbuilder.SectorInfo, len(sectors))
for k, sector := range sectors { for k, sector := range sectors {