move poller to sector store

This commit is contained in:
Łukasz Magiera 2019-08-14 23:33:52 +02:00
parent 399f91940b
commit e6493afd46
5 changed files with 56 additions and 102 deletions

View File

@ -1,56 +0,0 @@
package sectorbuilder
import (
"context"
"time"
)
// TODO: really need to get a callbacks API from the rust-sectorbuilder
func (sb *SectorBuilder) pollForSealedSectors(ctx context.Context) {
watching := make(map[uint64]bool)
staged, err := sb.GetAllStagedSectors()
if err != nil {
// TODO: this is probably worth shutting the miner down over until we
// have better recovery mechanisms
log.Errorf("failed to get staged sectors: %s", err)
}
for _, s := range staged {
watching[s.SectorID] = true
}
tick := time.Tick(time.Second * 5)
for {
select {
case <-tick:
log.Info("polling for sealed sectors...")
// add new staged sectors to watch list
staged, err := sb.GetAllStagedSectors()
if err != nil {
log.Errorf("in loop: failed to get staged sectors: %s", err)
continue
}
for _, s := range staged {
watching[s.SectorID] = true
}
for s := range watching {
status, err := sb.SealStatus(s)
if err != nil {
log.Errorf("getting seal status: %s", err)
continue
}
if status.SealStatusCode == 0 { // constant pls, zero implies the last step?
delete(watching, s)
sb.sschan <- status
}
}
case <-ctx.Done():
close(sb.sschan)
return
}
}
}

View File

@ -1,7 +1,6 @@
package sectorbuilder package sectorbuilder
import ( import (
"context"
"encoding/binary" "encoding/binary"
"unsafe" "unsafe"
@ -22,8 +21,6 @@ const CommLen = sectorbuilder.CommitmentBytesLen
type SectorBuilder struct { type SectorBuilder struct {
handle unsafe.Pointer handle unsafe.Pointer
sschan chan SectorSealingStatus
} }
type SectorBuilderConfig struct { type SectorBuilderConfig struct {
@ -44,7 +41,6 @@ func New(cfg *SectorBuilderConfig) (*SectorBuilder, error) {
return &SectorBuilder{ return &SectorBuilder{
handle: sbp, handle: sbp,
sschan: make(chan SectorSealingStatus, 32),
}, nil }, nil
} }
@ -60,10 +56,6 @@ func sectorIDtoBytes(sid uint64) [31]byte {
return out return out
} }
func (sb *SectorBuilder) Run(ctx context.Context) {
go sb.pollForSealedSectors(ctx)
}
func (sb *SectorBuilder) Destroy() { func (sb *SectorBuilder) Destroy() {
sectorbuilder.DestroySectorBuilder(sb.handle) sectorbuilder.DestroySectorBuilder(sb.handle)
} }
@ -95,11 +87,6 @@ func (sb *SectorBuilder) GeneratePoSt(sortedCommRs [][CommLen]byte, challengeSee
return sectorbuilder.GeneratePoSt(sb.handle, sortedCommRs, challengeSeed) return sectorbuilder.GeneratePoSt(sb.handle, sortedCommRs, challengeSeed)
} }
func (sb *SectorBuilder) SealedSectorChan() <-chan SectorSealingStatus {
// is this ever going to be multi-consumer? If so, switch to using pubsub/eventbus
return sb.sschan
}
var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector var UserBytesForSectorSize = sectorbuilder.GetMaxUserBytesPerStagedSector
func VerifySeal(sectorSize uint64, commR, commD, commRStar []byte, proverID address.Address, sectorID uint64, proof []byte) (bool, error) { func VerifySeal(sectorSize uint64, commR, commD, commRStar []byte, proverID address.Address, sectorID uint64, proof []byte) (bool, error) {

View File

@ -226,7 +226,7 @@ func Online() Option {
// Storage miner // Storage miner
ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner }, ApplyIf(func(s *Settings) bool { return s.nodeType == nodeStorageMiner },
Override(new(*sectorbuilder.SectorBuilder), modules.SectorBuilder), Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New),
Override(new(*sector.Store), sector.NewStore), Override(new(*sector.Store), sector.NewStore),
Override(new(*storage.Miner), modules.StorageMiner), Override(new(*storage.Miner), modules.StorageMiner),

View File

@ -64,24 +64,6 @@ func SectorBuilderConfig(storagePath string) func(dtypes.MetadataDS) (*sectorbui
} }
} }
func SectorBuilder(mctx helpers.MetricsCtx, lc fx.Lifecycle, sbc *sectorbuilder.SectorBuilderConfig) (*sectorbuilder.SectorBuilder, error) {
sb, err := sectorbuilder.New(sbc)
if err != nil {
return nil, err
}
ctx := helpers.LifecycleCtx(mctx, lc)
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
sb.Run(ctx)
return nil
},
})
return sb, nil
}
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store) (*storage.Miner, error) { func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store) (*storage.Miner, error) {
maddr, err := minerAddrFromDS(ds) maddr, err := minerAddrFromDS(ds)
if err != nil { if err != nil {

View File

@ -2,13 +2,19 @@ package sector
import ( import (
"context" "context"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"sync" "sync"
"time"
"github.com/filecoin-project/go-lotus/lib/sectorbuilder"
logging "github.com/ipfs/go-log"
) )
var log = logging.Logger("sectorstore")
// TODO: eventually handle sector storage here instead of in rust-sectorbuilder // TODO: eventually handle sector storage here instead of in rust-sectorbuilder
type Store struct { type Store struct {
lk sync.Mutex lk sync.Mutex
@ -33,13 +39,36 @@ func (s *Store) Service() {
go s.service() go s.service()
} }
func (s *Store) service() { func (s *Store) poll() {
sealed := s.sb.SealedSectorChan() log.Info("polling for sealed sectors...")
for { // get a list of sectors to poll
select {
case sector := <-sealed:
s.lk.Lock() s.lk.Lock()
toPoll := make([]uint64, 0, len(s.waiting))
for id := range s.waiting {
toPoll = append(toPoll, id)
}
s.lk.Unlock()
var done []sectorbuilder.SectorSealingStatus
// check status of each
for _, sec := range toPoll {
status, err := s.sb.SealStatus(sec)
if err != nil {
log.Errorf("getting seal status: %s", err)
continue
}
if status.SealStatusCode == 0 { // constant pls, zero implies the last step?
done = append(done, status)
}
}
// send updates
s.lk.Lock()
for _, sector := range done {
watch, ok := s.waiting[sector.SectorID] watch, ok := s.waiting[sector.SectorID]
if ok { if ok {
close(watch) close(watch)
@ -48,7 +77,17 @@ func (s *Store) service() {
for _, c := range s.incoming { for _, c := range s.incoming {
c <- sector // TODO: ctx! c <- sector // TODO: ctx!
} }
}
s.lk.Unlock() s.lk.Unlock()
}
func (s *Store) service() {
poll := time.Tick(5 * time.Second)
for {
select {
case <-poll:
s.poll()
case <-s.close: case <-s.close:
s.lk.Lock() s.lk.Lock()
for _, c := range s.incoming { for _, c := range s.incoming {
@ -68,12 +107,14 @@ func (s *Store) AddPiece(ref string, size uint64, r io.Reader, keepAtLeast uint6
if err != nil { if err != nil {
return 0, err return 0, err
} }
s.lk.Lock() s.lk.Lock()
_, exists := s.waiting[sectorID] _, exists := s.waiting[sectorID]
if !exists { if !exists { // pieces can share sectors
s.waiting[sectorID] = make(chan struct{}) s.waiting[sectorID] = make(chan struct{})
} }
s.lk.Unlock() s.lk.Unlock()
return sectorID, nil return sectorID, nil
} }