diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index c9cad6ae5..26123e757 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -110,6 +110,10 @@ func (sb *SectorBuilder) GetAllStagedSectors() ([]uint64, error) { return out, nil } +func (sb *SectorBuilder) GetAllSealedSectors() ([]SealedSectorMetadata, error) { + return sectorbuilder.GetAllSealedSectors(sb.handle) +} + func (sb *SectorBuilder) GeneratePoSt(sectorInfo SortedSectorInfo, challengeSeed [CommLen]byte, faults []uint64) ([]byte, error) { // Wait, this is a blocking method with no way of interrupting it? // does it checkpoint itself? diff --git a/storage/commitment/tracker.go b/storage/commitment/tracker.go index aa50c705e..10510fba0 100644 --- a/storage/commitment/tracker.go +++ b/storage/commitment/tracker.go @@ -136,3 +136,12 @@ func (ct *Tracker) WaitCommit(ctx context.Context, miner address.Address, sector return cid.Undef, ctx.Err() } } + +func (ct *Tracker) CheckCommitment(miner address.Address, sectorId uint64) (bool, error) { + key := commitmentKey(miner, sectorId) + + ct.lk.Lock() + defer ct.lk.Unlock() + + return ct.commitments.Has(key) +} diff --git a/storage/miner.go b/storage/miner.go index 0ab1ce009..81880f86b 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -2,6 +2,7 @@ package storage import ( "context" + "github.com/filecoin-project/go-sectorbuilder/sealing_state" "sync" "github.com/ipfs/go-cid" @@ -93,10 +94,47 @@ func (m *Miner) Run(ctx context.Context) error { return nil } +func (m *Miner) commitUntrackedSectors(ctx context.Context) error { + sealed, err := m.secst.Sealed() + if err != nil { + return err + } + + for _, s := range sealed { + has, err := m.commt.CheckCommitment(m.maddr, s.SectorID) + if err != nil { + log.Error("checking commitment: ", err) + } + + if has { + continue + } + + log.Warnf("Missing commitment for sector %d, committing sector", s.SectorID) + + if err := m.commitSector(ctx, sectorbuilder.SectorSealingStatus{ + SectorID: s.SectorID, + State: sealing_state.Sealed, + CommD: s.CommD, + CommR: s.CommR, + Proof: s.Proof, + Pieces: s.Pieces, + Ticket: s.Ticket, + }); err != nil { + log.Error("Committing uncommitted sector failed: ", err) + } + } + return nil +} + func (m *Miner) handlePostingSealedSectors(ctx context.Context) { incoming := m.secst.Incoming() defer m.secst.CloseIncoming(incoming) + if err := m.commitUntrackedSectors(ctx); err != nil { + log.Error(err) + } + for { select { case sinfo, ok := <-incoming: diff --git a/storage/sector/store.go b/storage/sector/store.go index 5a4b6a9a3..f82505837 100644 --- a/storage/sector/store.go +++ b/storage/sector/store.go @@ -263,6 +263,10 @@ func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.Sect return s.sb.SealStatus(sector) } +func (s *Store) Sealed() ([]sectorbuilder.SealedSectorMetadata, error) { + return s.sb.GetAllSealedSectors() +} + func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) { sbsi := make([]sectorbuilder.SectorInfo, len(sectors)) for k, sector := range sectors {