package sector import ( "context" "io" "io/ioutil" "os" "sync" "time" "github.com/filecoin-project/go-lotus/lib/sectorbuilder" logging "github.com/ipfs/go-log" ) var log = logging.Logger("sectorstore") // TODO: eventually handle sector storage here instead of in rust-sectorbuilder type Store struct { lk sync.Mutex sb *sectorbuilder.SectorBuilder waiting map[uint64]chan struct{} incoming []chan sectorbuilder.SectorSealingStatus // TODO: outdated chan closeCh chan struct{} } func NewStore(sb *sectorbuilder.SectorBuilder) *Store { return &Store{ sb: sb, waiting: map[uint64]chan struct{}{}, closeCh: make(chan struct{}), } } func (s *Store) Service() { go s.service() } func (s *Store) poll() { log.Info("polling for sealed sectors...") // get a list of sectors to poll s.lk.Lock() toPoll := make([]uint64, 0, len(s.waiting)) for id := range s.waiting { toPoll = append(toPoll, id) } s.lk.Unlock() var done []sectorbuilder.SectorSealingStatus // check status of each for _, sec := range toPoll { status, err := s.sb.SealStatus(sec) if err != nil { log.Errorf("getting seal status: %s", err) continue } if status.SealStatusCode == 0 { // constant pls, zero implies the last step? done = append(done, status) } } // send updates s.lk.Lock() for _, sector := range done { watch, ok := s.waiting[sector.SectorID] if ok { close(watch) delete(s.waiting, sector.SectorID) } for _, c := range s.incoming { c <- sector // TODO: ctx! } } s.lk.Unlock() } func (s *Store) service() { poll := time.Tick(5 * time.Second) for { select { case <-poll: s.poll() case <-s.closeCh: s.lk.Lock() for _, c := range s.incoming { close(c) } s.lk.Unlock() return } } } func (s *Store) AddPiece(ref string, size uint64, r io.Reader) (sectorID uint64, err error) { err = withTemp(r, func(f string) (err error) { sectorID, err = s.sb.AddPiece(ref, size, f) return err }) if err != nil { return 0, err } s.lk.Lock() _, exists := s.waiting[sectorID] if !exists { // pieces can share sectors s.waiting[sectorID] = make(chan struct{}) } s.lk.Unlock() return sectorID, nil } func (s *Store) CloseIncoming(c <-chan sectorbuilder.SectorSealingStatus) { s.lk.Lock() var at = -1 for i, ch := range s.incoming { if ch == c { at = i } } if at == -1 { s.lk.Unlock() return } if len(s.incoming) > 1 { last := len(s.incoming) - 1 s.incoming[at] = s.incoming[last] s.incoming[last] = nil } s.incoming = s.incoming[:len(s.incoming)-1] s.lk.Unlock() } func (s *Store) Incoming() <-chan sectorbuilder.SectorSealingStatus { ch := make(chan sectorbuilder.SectorSealingStatus, 8) s.lk.Lock() s.incoming = append(s.incoming, ch) s.lk.Unlock() return ch } func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.SectorSealingStatus, error) { s.lk.Lock() watch, ok := s.waiting[sector] s.lk.Unlock() if ok { select { case <-watch: case <-ctx.Done(): return sectorbuilder.SectorSealingStatus{}, ctx.Err() } } return s.sb.SealStatus(sector) } func (s *Store) Stop() { close(s.closeCh) } func withTemp(r io.Reader, cb func(string) error) error { f, err := ioutil.TempFile(os.TempDir(), "lotus-temp-") if err != nil { return err } if _, err := io.Copy(f, r); err != nil { return err } if err := f.Close(); err != nil { return err } err = cb(f.Name()) if err != nil { if err := os.Remove(f.Name()); err != nil { log.Errorf("couldn't remove temp file '%s'", f.Name()) } return err } return os.Remove(f.Name()) }