From e6493afd465b2bb6fa0609b9e2fc8ea680186772 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 14 Aug 2019 23:33:52 +0200 Subject: [PATCH] move poller to sector store --- lib/sectorbuilder/poll.go | 56 ------------------------ lib/sectorbuilder/sectorbuilder.go | 13 ------ node/builder.go | 2 +- node/modules/storageminer.go | 18 -------- storage/sector/store.go | 69 ++++++++++++++++++++++++------ 5 files changed, 56 insertions(+), 102 deletions(-) delete mode 100644 lib/sectorbuilder/poll.go diff --git a/lib/sectorbuilder/poll.go b/lib/sectorbuilder/poll.go deleted file mode 100644 index edf425f37..000000000 --- a/lib/sectorbuilder/poll.go +++ /dev/null @@ -1,56 +0,0 @@ -package sectorbuilder - -import ( - "context" - "time" -) - -// TODO: really need to get a callbacks API from the rust-sectorbuilder -func (sb *SectorBuilder) pollForSealedSectors(ctx context.Context) { - watching := make(map[uint64]bool) - - staged, err := sb.GetAllStagedSectors() - if err != nil { - // TODO: this is probably worth shutting the miner down over until we - // have better recovery mechanisms - log.Errorf("failed to get staged sectors: %s", err) - } - for _, s := range staged { - watching[s.SectorID] = true - } - - tick := time.Tick(time.Second * 5) - for { - select { - case <-tick: - log.Info("polling for sealed sectors...") - - // add new staged sectors to watch list - staged, err := sb.GetAllStagedSectors() - if err != nil { - log.Errorf("in loop: failed to get staged sectors: %s", err) - continue - } - - for _, s := range staged { - watching[s.SectorID] = true - } - - for s := range watching { - status, err := sb.SealStatus(s) - if err != nil { - log.Errorf("getting seal status: %s", err) - continue - } - - if status.SealStatusCode == 0 { // constant pls, zero implies the last step? - delete(watching, s) - sb.sschan <- status - } - } - case <-ctx.Done(): - close(sb.sschan) - return - } - } -} diff --git a/lib/sectorbuilder/sectorbuilder.go b/lib/sectorbuilder/sectorbuilder.go index 28fd72d22..8ad6d61f4 100644 --- a/lib/sectorbuilder/sectorbuilder.go +++ b/lib/sectorbuilder/sectorbuilder.go @@ -1,7 +1,6 @@ package sectorbuilder import ( - "context" "encoding/binary" "unsafe" @@ -22,8 +21,6 @@ const CommLen = sectorbuilder.CommitmentBytesLen type SectorBuilder struct { handle unsafe.Pointer - - sschan chan SectorSealingStatus } type SectorBuilderConfig struct { @@ -44,7 +41,6 @@ func New(cfg *SectorBuilderConfig) (*SectorBuilder, error) { return &SectorBuilder{ handle: sbp, - sschan: make(chan SectorSealingStatus, 32), }, nil } @@ -60,10 +56,6 @@ func sectorIDtoBytes(sid uint64) [31]byte { return out } -func (sb *SectorBuilder) Run(ctx context.Context) { - go sb.pollForSealedSectors(ctx) -} - func (sb *SectorBuilder) Destroy() { sectorbuilder.DestroySectorBuilder(sb.handle) } @@ -95,11 +87,6 @@ func (sb *SectorBuilder) GeneratePoSt(sortedCommRs [][CommLen]byte, challengeSee return sectorbuilder.GeneratePoSt(sb.handle, sortedCommRs, challengeSeed) } -func (sb *SectorBuilder) SealedSectorChan() <-chan SectorSealingStatus { - // is this ever going to be multi-consumer? If so, switch to using pubsub/eventbus - return sb.sschan -} - var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector func VerifySeal(sectorSize uint64, commR, commD, commRStar []byte, proverID address.Address, sectorID uint64, proof []byte) (bool, error) { diff --git a/node/builder.go b/node/builder.go index 9155da058..ff197b425 100644 --- a/node/builder.go +++ b/node/builder.go @@ -226,7 +226,7 @@ func Online() Option { // Storage miner ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner }, - Override(new(*sectorbuilder.SectorBuilder), modules.SectorBuilder), + Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New), Override(new(*sector.Store), sector.NewStore), Override(new(*storage.Miner), modules.StorageMiner), diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index ef24d6ae7..df068c99c 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -64,24 +64,6 @@ func SectorBuilderConfig(storagePath string) func(dtypes.MetadataDS) (*sectorbui } } -func SectorBuilder(mctx helpers.MetricsCtx, lc fx.Lifecycle, sbc *sectorbuilder.SectorBuilderConfig) (*sectorbuilder.SectorBuilder, error) { - sb, err := sectorbuilder.New(sbc) - if err != nil { - return nil, err - } - - ctx := helpers.LifecycleCtx(mctx, lc) - - lc.Append(fx.Hook{ - OnStart: func(context.Context) error { - sb.Run(ctx) - return nil - }, - }) - - return sb, nil -} - func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store) (*storage.Miner, error) { maddr, err := minerAddrFromDS(ds) if err != nil { diff --git a/storage/sector/store.go b/storage/sector/store.go index 8bce6a7e0..6968958a5 100644 --- a/storage/sector/store.go +++ b/storage/sector/store.go @@ -2,13 +2,19 @@ package sector import ( "context" - "github.com/filecoin-project/go-lotus/lib/sectorbuilder" "io" "io/ioutil" "os" "sync" + "time" + + "github.com/filecoin-project/go-lotus/lib/sectorbuilder" + + logging "github.com/ipfs/go-log" ) +var log = logging.Logger("sectorstore") + // TODO: eventually handle sector storage here instead of in rust-sectorbuilder type Store struct { lk sync.Mutex @@ -33,22 +39,55 @@ func (s *Store) Service() { go s.service() } +func (s *Store) poll() { + log.Info("polling for sealed sectors...") + + // get a list of sectors to poll + s.lk.Lock() + toPoll := make([]uint64, 0, len(s.waiting)) + + for id := range s.waiting { + toPoll = append(toPoll, id) + } + s.lk.Unlock() + + var done []sectorbuilder.SectorSealingStatus + + // check status of each + for _, sec := range toPoll { + status, err := s.sb.SealStatus(sec) + if err != nil { + log.Errorf("getting seal status: %s", err) + continue + } + + if status.SealStatusCode == 0 { // constant pls, zero implies the last step? + done = append(done, status) + } + } + + // send updates + s.lk.Lock() + for _, sector := range done { + watch, ok := s.waiting[sector.SectorID] + if ok { + close(watch) + delete(s.waiting, sector.SectorID) + } + for _, c := range s.incoming { + c <- sector // TODO: ctx! + } + } + s.lk.Unlock() +} + func (s *Store) service() { - sealed := s.sb.SealedSectorChan() + poll := time.Tick(5 * time.Second) for { select { - case sector := <-sealed: - s.lk.Lock() - watch, ok := s.waiting[sector.SectorID] - if ok { - close(watch) - delete(s.waiting, sector.SectorID) - } - for _, c := range s.incoming { - c <- sector // TODO: ctx! - } - s.lk.Unlock() + case <-poll: + s.poll() case <-s.close: s.lk.Lock() for _, c := range s.incoming { @@ -68,12 +107,14 @@ func (s *Store) AddPiece(ref string, size uint64, r io.Reader, keepAtLeast uint6 if err != nil { return 0, err } + s.lk.Lock() _, exists := s.waiting[sectorID] - if !exists { + if !exists { // pieces can share sectors s.waiting[sectorID] = make(chan struct{}) } s.lk.Unlock() + return sectorID, nil }